You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/08/17 00:25:17 UTC
svn commit: r804789 [3/6] - in /lucene/nutch/branches/nutchbase: ./ bin/
conf/ lib/ src/java/org/apache/nutch/analysis/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/ind...
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java Sun Aug 16 22:25:12 2009
@@ -1,1042 +1,161 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
package org.apache.nutch.fetcher;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
-// Commons Logging imports
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.ValueFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.NutchWritable;
-import org.apache.nutch.crawl.SignatureFactory;
-import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.*;
-import org.apache.nutch.protocol.*;
-import org.apache.nutch.parse.*;
-import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.*;
-
-
-/**
- * A queue-based fetcher.
- *
- * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
- * and many consumers (FetcherThread-s).
- *
- * <p>QueueFeeder reads input fetchlists and
- * populates a set of FetchItemQueue-s, which hold FetchItem-s that
- * describe the items to be fetched. There are as many queues as there are unique
- * hosts, but at any given time the total number of fetch items in all queues
- * is less than a fixed number (currently set to a multiple of the number of
- * threads).
- *
- * <p>As items are consumed from the queues, the QueueFeeder continues to add new
- * input items, so that their total count stays fixed (FetcherThread-s may also
- * add new items to the queues e.g. as a results of redirection) - until all
- * input items are exhausted, at which point the number of items in the queues
- * begins to decrease. When this number reaches 0 fetcher will finish.
- *
- * <p>This fetcher implementation handles per-host blocking itself, instead
- * of delegating this work to protocol-specific plugins.
- * Each per-host queue handles its own "politeness" settings, such as the
- * maximum number of concurrent requests and crawl delay between consecutive
- * requests - and also a list of requests in progress, and the time the last
- * request was finished. As FetcherThread-s ask for new items to be fetched,
- * queues may return eligible items or null if for "politeness" reasons this
- * host's queue is not yet ready.
- *
- * <p>If there are still unfetched items in the queues, but none of the items
- * are ready, FetcherThread-s will spin-wait until either some items become
- * available, or a timeout is reached (at which point the Fetcher will abort,
- * assuming the task is hung).
- *
- * @author Andrzej Bialecki
+/** Multi-threaded fetcher.
+ *
*/
-public class Fetcher extends Configured implements
- MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
-
- public static final int PERM_REFRESH_TIME = 5;
-
- public static final String CONTENT_REDIR = "content";
-
- public static final String PROTOCOL_REDIR = "protocol";
-
- public static final Log LOG = LogFactory.getLog(Fetcher.class);
+public class Fetcher
+implements Tool {
- public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
- /** Don't split inputs, to keep things polite. */
- public InputSplit[] getSplits(JobConf job, int nSplits)
- throws IOException {
- FileStatus[] files = listStatus(job);
- FileSplit[] splits = new FileSplit[files.length];
- for (int i = 0; i < files.length; i++) {
- FileStatus cur = files[i];
- splits[i] = new FileSplit(cur.getPath(), 0,
- cur.getLen(), (String[])null);
- }
- return splits;
- }
- }
-
- private OutputCollector<Text, NutchWritable> output;
- private Reporter reporter;
+ public static final String PROTOCOL_REDIR = "protocol";
- private String segmentName;
- private AtomicInteger activeThreads = new AtomicInteger(0);
- private AtomicInteger spinWaiting = new AtomicInteger(0);
-
- private long start = System.currentTimeMillis(); // start time of fetcher run
- private AtomicLong lastRequestStart = new AtomicLong(start);
+ public static final int PERM_REFRESH_TIME = 5;
- private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
- private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
- private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+ public static final byte[] REDIRECT_DISCOVERED =
+ Bytes.toBytes("___rdrdsc__");
- private boolean storingContent;
- private boolean parsing;
- FetchItemQueues fetchQueues;
- QueueFeeder feeder;
+ public static final byte[] FETCH_MARK =
+ Bytes.toBytes("__ftchmrk__");
- /**
- * This class described the item to be fetched.
- */
- private static class FetchItem {
- String queueID;
- Text url;
- URL u;
- CrawlDatum datum;
-
- public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
- this.url = url;
- this.u = u;
- this.datum = datum;
- this.queueID = queueID;
- }
-
- /** Create an item. Queue id will be created based on <code>byIP</code>
- * argument, either as a protocol + hostname pair, or protocol + IP
- * address pair.
- */
- public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
- String queueID;
- URL u = null;
- try {
- u = new URL(url.toString());
- } catch (Exception e) {
- LOG.warn("Cannot parse url: " + url, e);
- return null;
- }
- String proto = u.getProtocol().toLowerCase();
- String host;
- if (byIP) {
- try {
- InetAddress addr = InetAddress.getByName(u.getHost());
- host = addr.getHostAddress();
- } catch (UnknownHostException e) {
- // unable to resolve it, so don't fall back to host name
- LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
- return null;
- }
- } else {
- host = u.getHost();
- if (host == null) {
- LOG.warn("Unknown host for url: " + url + ", skipping.");
- return null;
- }
- host = host.toLowerCase();
- }
- queueID = proto + "://" + host;
- return new FetchItem(url, u, datum, queueID);
- }
-
- public CrawlDatum getDatum() {
- return datum;
- }
-
- public String getQueueID() {
- return queueID;
- }
-
- public Text getUrl() {
- return url;
- }
-
- public URL getURL2() {
- return u;
- }
- }
+ private static final Collection<HbaseColumn> COLUMNS =
+ new HashSet<HbaseColumn>();
- /**
- * This class handles FetchItems which come from the same host ID (be it
- * a proto/hostname or proto/IP pair). It also keeps track of requests in
- * progress and elapsed time between requests.
- */
- private static class FetchItemQueue {
- List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
- Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
- AtomicLong nextFetchTime = new AtomicLong();
- long crawlDelay;
- long minCrawlDelay;
- int maxThreads;
- Configuration conf;
-
- public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
- this.conf = conf;
- this.maxThreads = maxThreads;
- this.crawlDelay = crawlDelay;
- this.minCrawlDelay = minCrawlDelay;
- // ready to start
- setEndTime(System.currentTimeMillis() - crawlDelay);
- }
-
- public int getQueueSize() {
- return queue.size();
- }
-
- public int getInProgressSize() {
- return inProgress.size();
- }
-
- public void finishFetchItem(FetchItem it, boolean asap) {
- if (it != null) {
- inProgress.remove(it);
- setEndTime(System.currentTimeMillis(), asap);
- }
- }
-
- public void addFetchItem(FetchItem it) {
- if (it == null) return;
- queue.add(it);
- }
-
- public void addInProgressFetchItem(FetchItem it) {
- if (it == null) return;
- inProgress.add(it);
- }
-
- public FetchItem getFetchItem() {
- if (inProgress.size() >= maxThreads) return null;
- long now = System.currentTimeMillis();
- if (nextFetchTime.get() > now) return null;
- FetchItem it = null;
- if (queue.size() == 0) return null;
- try {
- it = queue.remove(0);
- inProgress.add(it);
- } catch (Exception e) {
- LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
- }
- return it;
- }
-
- public synchronized void dump() {
- LOG.info(" maxThreads = " + maxThreads);
- LOG.info(" inProgress = " + inProgress.size());
- LOG.info(" crawlDelay = " + crawlDelay);
- LOG.info(" minCrawlDelay = " + minCrawlDelay);
- LOG.info(" nextFetchTime = " + nextFetchTime.get());
- LOG.info(" now = " + System.currentTimeMillis());
- for (int i = 0; i < queue.size(); i++) {
- FetchItem it = queue.get(i);
- LOG.info(" " + i + ". " + it.url);
- }
- }
-
- private void setEndTime(long endTime) {
- setEndTime(endTime, false);
- }
-
- private void setEndTime(long endTime, boolean asap) {
- if (!asap)
- nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
- else
- nextFetchTime.set(endTime);
- }
- }
-
- /**
- * Convenience class - a collection of queues that keeps track of the total
- * number of items, and provides items eligible for fetching from any queue.
- */
- private static class FetchItemQueues {
- public static final String DEFAULT_ID = "default";
- Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
- AtomicInteger totalSize = new AtomicInteger(0);
- int maxThreads;
- boolean byIP;
- long crawlDelay;
- long minCrawlDelay;
- Configuration conf;
-
- public FetchItemQueues(Configuration conf) {
- this.conf = conf;
- this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
- // backward-compatible default setting
- this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
- this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
- this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
- }
-
- public int getTotalSize() {
- return totalSize.get();
- }
-
- public int getQueueCount() {
- return queues.size();
- }
-
- public void addFetchItem(Text url, CrawlDatum datum) {
- FetchItem it = FetchItem.create(url, datum, byIP);
- if (it != null) addFetchItem(it);
- }
-
- public void addFetchItem(FetchItem it) {
- FetchItemQueue fiq = getFetchItemQueue(it.queueID);
- fiq.addFetchItem(it);
- totalSize.incrementAndGet();
- }
-
- public void finishFetchItem(FetchItem it) {
- finishFetchItem(it, false);
- }
-
- public void finishFetchItem(FetchItem it, boolean asap) {
- FetchItemQueue fiq = queues.get(it.queueID);
- if (fiq == null) {
- LOG.warn("Attempting to finish item from unknown queue: " + it);
- return;
- }
- fiq.finishFetchItem(it, asap);
- }
-
- public synchronized FetchItemQueue getFetchItemQueue(String id) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq == null) {
- // initialize queue
- fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
- queues.put(id, fiq);
- }
- return fiq;
- }
-
- public synchronized FetchItem getFetchItem() {
- Iterator<Map.Entry<String, FetchItemQueue>> it =
- queues.entrySet().iterator();
- while (it.hasNext()) {
- FetchItemQueue fiq = it.next().getValue();
- // reap empty queues
- if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
- it.remove();
- continue;
- }
- FetchItem fit = fiq.getFetchItem();
- if (fit != null) {
- totalSize.decrementAndGet();
- return fit;
- }
- }
- return null;
- }
-
- public synchronized void dump() {
- for (String id : queues.keySet()) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0) continue;
- LOG.info("* queue: " + id);
- fiq.dump();
- }
- }
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA,
+ Generator.GENERATOR_MARK));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.REPR_URL));
}
/**
- * This class feeds the queues with input items, and re-fills them as
- * items are consumed by FetcherThread-s.
+ * <p>Mapper class for Fetcher.</p>
+ * <p>
+ * This class reads the random integer written by {@link GeneratorHbase} as its key
+ * while outputting the actual key and value arguments through a {@link FetchEntry}
+ * instance.
+ * </p>
+ * <p>
+ * This approach (combined with the use of {@link PartitionUrlByHostHbase})
+ * makes sure that Fetcher is still polite while also randomizing the key order.
+ * If one host has a huge number of URLs in your table while other hosts
+ * have not, {@link FetcherReducer} will not be stuck on one host but process
+ * URLs from other hosts as well.
+ * </p>
*/
- private static class QueueFeeder extends Thread {
- private RecordReader<Text, CrawlDatum> reader;
- private FetchItemQueues queues;
- private int size;
-
- public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
- FetchItemQueues queues, int size) {
- this.reader = reader;
- this.queues = queues;
- this.size = size;
- this.setDaemon(true);
- this.setName("QueueFeeder");
- }
-
- public void run() {
- boolean hasMore = true;
- int cnt = 0;
-
- while (hasMore) {
- int feed = size - queues.getTotalSize();
- if (feed <= 0) {
- // queues are full - spin-wait until they have some free space
- try {
- Thread.sleep(1000);
- } catch (Exception e) {};
- continue;
- } else {
- LOG.debug("-feeding " + feed + " input urls ...");
- while (feed > 0 && hasMore) {
- try {
- Text url = new Text();
- CrawlDatum datum = new CrawlDatum();
- hasMore = reader.next(url, datum);
- if (hasMore) {
- queues.addFetchItem(url, datum);
- cnt++;
- feed--;
- }
- } catch (IOException e) {
- LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
- return;
- }
- }
- }
- }
- LOG.info("QueueFeeder finished: total " + cnt + " records.");
- }
- }
-
- /**
- * This class picks items from queues and fetches the pages.
- */
- private class FetcherThread extends Thread {
- private Configuration conf;
- private URLFilters urlFilters;
- private ScoringFilters scfilters;
- private ParseUtil parseUtil;
- private URLNormalizers normalizers;
- private ProtocolFactory protocolFactory;
- private long maxCrawlDelay;
- private boolean byIP;
- private int maxRedirect;
- private String reprUrl;
- private boolean redirecting;
- private int redirectCount;
- private boolean ignoreExternalLinks;
-
- public FetcherThread(Configuration conf) {
- this.setDaemon(true); // don't hang JVM on exit
- this.setName("FetcherThread"); // use an informative name
- this.conf = conf;
- this.urlFilters = new URLFilters(conf);
- this.scfilters = new ScoringFilters(conf);
- this.parseUtil = new ParseUtil(conf);
- this.protocolFactory = new ProtocolFactory(conf);
- this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
- this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
- // backward-compatible default setting
- this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
- this.maxRedirect = conf.getInt("http.redirect.max", 3);
- this.ignoreExternalLinks =
- conf.getBoolean("db.ignore.external.links", false);
- }
-
- public void run() {
- activeThreads.incrementAndGet(); // count threads
-
- FetchItem fit = null;
- try {
-
- while (true) {
- fit = fetchQueues.getFetchItem();
- if (fit == null) {
- if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
- LOG.debug(getName() + " spin-waiting ...");
- // spin-wait.
- spinWaiting.incrementAndGet();
- try {
- Thread.sleep(500);
- } catch (Exception e) {}
- spinWaiting.decrementAndGet();
- continue;
- } else {
- // all done, finish this thread
- return;
- }
- }
- lastRequestStart.set(System.currentTimeMillis());
- Text reprUrlWritable =
- (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
- if (reprUrlWritable == null) {
- reprUrl = fit.url.toString();
- } else {
- reprUrl = reprUrlWritable.toString();
- }
- try {
- if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
-
- // fetch the page
- redirecting = false;
- redirectCount = 0;
- do {
- if (LOG.isDebugEnabled()) {
- LOG.debug("redirectCount=" + redirectCount);
- }
- redirecting = false;
- Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
- RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
- if (!rules.isAllowed(fit.u)) {
- // unblock
- fetchQueues.finishFetchItem(fit, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Denied by robots.txt: " + fit.url);
- }
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
- continue;
- }
- if (rules.getCrawlDelay() > 0) {
- if (rules.getCrawlDelay() > maxCrawlDelay) {
- // unblock
- fetchQueues.finishFetchItem(fit, true);
- LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
- continue;
- } else {
- FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
- fiq.crawlDelay = rules.getCrawlDelay();
- }
- }
- ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
- ProtocolStatus status = output.getStatus();
- Content content = output.getContent();
- ParseStatus pstatus = null;
- // unblock queue
- fetchQueues.finishFetchItem(fit);
-
- String urlString = fit.url.toString();
-
- switch(status.getCode()) {
-
- case ProtocolStatus.WOULDBLOCK:
- // retry ?
- fetchQueues.addFetchItem(fit);
- break;
-
- case ProtocolStatus.SUCCESS: // got a page
- pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
- updateStatus(content.getContent().length);
- if (pstatus != null && pstatus.isSuccess() &&
- pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
- String newUrl = pstatus.getMessage();
- int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
- Text redirUrl =
- handleRedirect(fit.url, fit.datum,
- urlString, newUrl,
- refreshTime < Fetcher.PERM_REFRESH_TIME,
- Fetcher.CONTENT_REDIR);
- if (redirUrl != null) {
- CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
- fit.datum.getFetchInterval(), fit.datum.getScore());
- if (reprUrl != null) {
- newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
- new Text(reprUrl));
- }
- fit = FetchItem.create(redirUrl, newDatum, byIP);
- if (fit != null) {
- FetchItemQueue fiq =
- fetchQueues.getFetchItemQueue(fit.queueID);
- fiq.addInProgressFetchItem(fit);
- } else {
- // stop redirecting
- redirecting = false;
- }
- }
- }
- break;
-
- case ProtocolStatus.MOVED: // redirect
- case ProtocolStatus.TEMP_MOVED:
- int code;
- boolean temp;
- if (status.getCode() == ProtocolStatus.MOVED) {
- code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
- temp = false;
- } else {
- code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
- temp = true;
- }
- output(fit.url, fit.datum, content, status, code);
- String newUrl = status.getMessage();
- Text redirUrl =
- handleRedirect(fit.url, fit.datum,
- urlString, newUrl, temp,
- Fetcher.PROTOCOL_REDIR);
- if (redirUrl != null) {
- CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
- fit.datum.getFetchInterval(), fit.datum.getScore());
- if (reprUrl != null) {
- newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
- new Text(reprUrl));
- }
- fit = FetchItem.create(redirUrl, newDatum, byIP);
- if (fit != null) {
- FetchItemQueue fiq =
- fetchQueues.getFetchItemQueue(fit.queueID);
- fiq.addInProgressFetchItem(fit);
- } else {
- // stop redirecting
- redirecting = false;
- }
- } else {
- // stop redirecting
- redirecting = false;
- }
- break;
-
- case ProtocolStatus.EXCEPTION:
- logError(fit.url, status.getMessage());
- /* FALLTHROUGH */
- case ProtocolStatus.RETRY: // retry
- case ProtocolStatus.BLOCKED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
- break;
-
- case ProtocolStatus.GONE: // gone
- case ProtocolStatus.NOTFOUND:
- case ProtocolStatus.ACCESS_DENIED:
- case ProtocolStatus.ROBOTS_DENIED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
- break;
-
- case ProtocolStatus.NOTMODIFIED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
- break;
-
- default:
- if (LOG.isWarnEnabled()) {
- LOG.warn("Unknown ProtocolStatus: " + status.getCode());
- }
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
- }
-
- if (redirecting && redirectCount >= maxRedirect) {
- fetchQueues.finishFetchItem(fit);
- if (LOG.isInfoEnabled()) {
- LOG.info(" - redirect count exceeded " + fit.url);
- }
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
- }
-
- } while (redirecting && (redirectCount < maxRedirect));
-
- } catch (Throwable t) { // unexpected exception
- // unblock
- fetchQueues.finishFetchItem(fit);
- logError(fit.url, t.toString());
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
- }
- }
-
- } catch (Throwable e) {
- if (LOG.isFatalEnabled()) {
- e.printStackTrace(LogUtil.getFatalStream(LOG));
- LOG.fatal("fetcher caught:"+e.toString());
- }
- } finally {
- if (fit != null) fetchQueues.finishFetchItem(fit);
- activeThreads.decrementAndGet(); // count threads
- LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
- }
- }
+ public static class FetcherMapper
+ extends TableMapper<ImmutableBytesWritable, FetchEntry> {
- private Text handleRedirect(Text url, CrawlDatum datum,
- String urlString, String newUrl,
- boolean temp, String redirType)
- throws MalformedURLException, URLFilterException {
- newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
- newUrl = urlFilters.filter(newUrl);
-
- if (ignoreExternalLinks) {
- try {
- String origHost = new URL(urlString).getHost().toLowerCase();
- String newHost = new URL(newUrl).getHost().toLowerCase();
- if (!origHost.equals(newHost)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - ignoring redirect " + redirType + " from " +
- urlString + " to " + newUrl +
- " because external links are ignored");
- }
- return null;
- }
- } catch (MalformedURLException e) { }
- }
-
- if (newUrl != null && !newUrl.equals(urlString)) {
- reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
- url = new Text(newUrl);
- if (maxRedirect > 0) {
- redirecting = true;
- redirectCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching now)");
- }
- return url;
- } else {
- CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
- datum.getFetchInterval());
- if (reprUrl != null) {
- newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
- new Text(reprUrl));
- }
- output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching later)");
- }
- return null;
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect skipped: " +
- (newUrl != null ? "to same url" : "filtered"));
- }
- return null;
- }
- }
-
- private void logError(Text url, String message) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fetch of " + url + " failed with: " + message);
- }
- errors.incrementAndGet();
+ @Override
+ protected void map(ImmutableBytesWritable key, Result value,
+ Context context) throws IOException, InterruptedException {
+ byte[] outKeyRaw =
+ value.getValue(WebTableColumns.METADATA, Generator.GENERATOR_MARK);
+ context.write(new ImmutableBytesWritable(outKeyRaw), new FetchEntry(key, value));
}
-
- private ParseStatus output(Text key, CrawlDatum datum,
- Content content, ProtocolStatus pstatus, int status) {
-
- datum.setStatus(status);
- datum.setFetchTime(System.currentTimeMillis());
- if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-
- ParseResult parseResult = null;
- if (content != null) {
- Metadata metadata = content.getMetadata();
- // add segment to metadata
- metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
- // add score to content metadata so that ParseSegment can pick it up.
- try {
- scfilters.passScoreBeforeParsing(key, datum, content);
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- e.printStackTrace(LogUtil.getWarnStream(LOG));
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
- }
- }
- /* Note: Fetcher will only follow meta-redirects coming from the
- * original URL. */
- if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
- try {
- parseResult = this.parseUtil.parse(content);
- } catch (Exception e) {
- LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
- }
-
- if (parseResult == null) {
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content,
- new ParseStatus().getEmptyParse(conf));
- datum.setSignature(signature);
- }
- }
-
- /* Store status code in content So we can read this value during
- * parsing (as a separate job) and decide to parse or not.
- */
- content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
- }
-
- try {
- output.collect(key, new NutchWritable(datum));
- if (content != null && storingContent)
- output.collect(key, new NutchWritable(content));
- if (parseResult != null) {
- for (Entry<Text, Parse> entry : parseResult) {
- Text url = entry.getKey();
- Parse parse = entry.getValue();
- ParseStatus parseStatus = parse.getData().getStatus();
-
- if (!parseStatus.isSuccess()) {
- LOG.warn("Error parsing: " + key + ": " + parseStatus);
- parse = parseStatus.getEmptyParse(getConf());
- }
-
- // Calculate page signature. For non-parsing fetchers this will
- // be done in ParseSegment
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content, parse);
- // Ensure segment name and score are in parseData metadata
- parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
- segmentName);
- parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
- StringUtil.toHexString(signature));
- // Pass fetch time to content meta
- parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
- Long.toString(datum.getFetchTime()));
- if (url.equals(key))
- datum.setSignature(signature);
- try {
- scfilters.passScoreAfterParsing(url, content, parse);
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- e.printStackTrace(LogUtil.getWarnStream(LOG));
- LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
- }
- }
- output.collect(url, new NutchWritable(
- new ParseImpl(new ParseText(parse.getText()),
- parse.getData(), parse.isCanonical())));
- }
- }
- } catch (IOException e) {
- if (LOG.isFatalEnabled()) {
- e.printStackTrace(LogUtil.getFatalStream(LOG));
- LOG.fatal("fetcher caught:"+e.toString());
- }
- }
-
- // return parse status if it exits
- if (parseResult != null && !parseResult.isEmpty()) {
- Parse p = parseResult.get(content.getUrl());
- if (p != null) {
- return p.getData().getStatus();
- }
- }
- return null;
- }
-
}
-
- public Fetcher() { super(null); }
-
- public Fetcher(Configuration conf) { super(conf); }
-
- private void updateStatus(int bytesInPage) throws IOException {
- pages.incrementAndGet();
- bytes.addAndGet(bytesInPage);
- }
-
- private void reportStatus() throws IOException {
- String status;
- long elapsed = (System.currentTimeMillis() - start)/1000;
- status = activeThreads + " threads, " +
- pages+" pages, "+errors+" errors, "
- + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
- + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
- reporter.setStatus(status);
- }
-
- public void configure(JobConf job) {
- setConf(job);
-
- this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
- this.storingContent = isStoringContent(job);
- this.parsing = isParsing(job);
-
-// if (job.getBoolean("fetcher.verbose", false)) {
-// LOG.setLevel(Level.FINE);
-// }
- }
+ public static final Log LOG = LogFactory.getLog(Fetcher.class);
- public void close() {}
+ private Configuration conf;
- public static boolean isParsing(Configuration conf) {
- return conf.getBoolean("fetcher.parse", true);
+ @Override
+ public Configuration getConf() {
+ return conf;
}
- public static boolean isStoringContent(Configuration conf) {
- return conf.getBoolean("fetcher.store.content", true);
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
}
- public void run(RecordReader<Text, CrawlDatum> input,
- OutputCollector<Text, NutchWritable> output,
- Reporter reporter) throws IOException {
-
- this.output = output;
- this.reporter = reporter;
- this.fetchQueues = new FetchItemQueues(getConf());
-
- int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
-
- feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
- //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
- feeder.start();
-
- // set non-blocking & no-robots mode for HTTP protocol plugins.
- getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
- getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
+ private void fetch(String table, int threads, boolean restart) throws Exception {
+ LOG.info("Fetcher: starting");
+ LOG.info("Fetcher: table: " + table);
- for (int i = 0; i < threadCount; i++) { // spawn threads
- new FetcherThread(getConf()).start();
+ if (threads > 0) {
+ getConf().setInt("fetcher.threads.fetch", threads);
}
-
- // select a timeout that avoids a task timeout
- long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
-
- do { // wait for threads to exit
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
-
- reportStatus();
- LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
- + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
-
- if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
- fetchQueues.dump();
- }
- // some requests seem to hang, despite all intentions
- if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Aborting with "+activeThreads+" hung threads.");
- }
- return;
- }
-
- } while (activeThreads.get() > 0);
- LOG.info("-activeThreads=" + activeThreads);
- }
-
- public void fetch(Path segment, int threads, boolean parsing)
- throws IOException {
-
- checkConfiguration();
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Fetcher: starting");
- LOG.info("Fetcher: segment: " + segment);
+ Job job = new NutchJob(getConf(), "fetch " + table);
+ Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+ List<Filter> filters = new ArrayList<Filter>();
+ if (!restart) {
+ filters.add(new ValueFilter(WebTableColumns.METADATA, FETCH_MARK,
+ CompareOp.NOT_EQUAL, TableUtil.YES_VAL, false));
}
-
- JobConf job = new NutchJob(getConf());
- job.setJobName("fetch " + segment);
-
- job.setInt("fetcher.threads.fetch", threads);
- job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
- job.setBoolean("fetcher.parse", parsing);
-
- // for politeness, don't permit parallel execution of a single task
- job.setSpeculativeExecution(false);
-
- FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
- job.setInputFormat(InputFormat.class);
-
- job.setMapRunnerClass(Fetcher.class);
-
- FileOutputFormat.setOutputPath(job, segment);
- job.setOutputFormat(FetcherOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NutchWritable.class);
-
- JobClient.runJob(job);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
+ filters.add(new ValueFilter(WebTableColumns.METADATA, Generator.GENERATOR_MARK,
+ CompareOp.GREATER_OR_EQUAL, new byte[] { (byte)0 }, true));
+ FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filters);
+ scan.setFilter(filterList);
+ TableMapReduceUtil.initTableMapperJob(table, scan, FetcherMapper.class,
+ ImmutableBytesWritable.class, FetchEntry.class, job);
+ TableMapReduceUtil.initTableReducerJob(table,
+ FetcherReducer.class, job, PartitionUrlByHost.class);
+
+ job.waitForCompletion(true);
+
+ LOG.info("Fetcher: done");
}
-
-
- /** Run the fetcher. */
- public static void main(String[] args) throws Exception {
-
- String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+
+ @Override
+ public int run(String[] args) throws Exception {
+ final String usage = "Usage: FetcherHbase <webtable> [-threads n] [-restart]";
if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
}
-
- Path segment = new Path(args[0]);
- Configuration conf = NutchConfiguration.create();
+ final String table = args[0];
- int threads = conf.getInt("fetcher.threads.fetch", 10);
- boolean parsing = true;
+ int threads = -1;
+ boolean restart = false;
- for (int i = 1; i < args.length; i++) { // parse command line
- if (args[i].equals("-threads")) { // found -threads option
+ for (int i = 1; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ // found -threads option
threads = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-noParsing")) parsing = false;
+ } else if ("-restart".equals(args[i])) {
+ restart = true;
+ }
}
- conf.setInt("fetcher.threads.fetch", threads);
- if (!parsing) {
- conf.setBoolean("fetcher.parse", parsing);
- }
- Fetcher fetcher = new Fetcher(conf); // make a Fetcher
-
- fetcher.fetch(segment, threads, parsing); // run the Fetcher
+ fetch(table, threads, restart); // run the Fetcher
+ return 0;
}
- private void checkConfiguration() {
-
- // ensure that a value has been set for the agent name and that that
- // agent name is the first value in the agents we advertise for robot
- // rules parsing
- String agentName = getConf().get("http.agent.name");
- if (agentName == null || agentName.trim().length() == 0) {
- String message = "Fetcher: No agents listed in 'http.agent.name'"
- + " property.";
- if (LOG.isFatalEnabled()) {
- LOG.fatal(message);
- }
- throw new IllegalArgumentException(message);
- } else {
-
- // get all of the agents that we advertise
- String agentNames = getConf().get("http.robots.agents");
- StringTokenizer tok = new StringTokenizer(agentNames, ",");
- ArrayList<String> agents = new ArrayList<String>();
- while (tok.hasMoreTokens()) {
- agents.add(tok.nextToken().trim());
- }
-
- // if the first one is not equal to our agent name, log fatal and throw
- // an exception
- if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
- String message = "Fetcher: Your 'http.agent.name' value should be "
- + "listed first in 'http.robots.agents' property.";
- if (LOG.isWarnEnabled()) {
- LOG.warn(message);
- }
- }
- }
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new Fetcher(), args);
+ System.exit(res);
}
-
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java Sun Aug 16 22:25:12 2009
@@ -17,65 +17,16 @@
package org.apache.nutch.fetcher;
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
-import org.apache.hadoop.io.*;
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.parse.*;
+import org.apache.hadoop.io.Writable;
/* An entry in the fetcher's output. */
public final class FetcherOutput implements Writable {
- private CrawlDatum crawlDatum;
- private Content content;
- private ParseImpl parse;
-
- public FetcherOutput() {}
-
- public FetcherOutput(CrawlDatum crawlDatum, Content content,
- ParseImpl parse) {
- this.crawlDatum = crawlDatum;
- this.content = content;
- this.parse = parse;
- }
-
- public final void readFields(DataInput in) throws IOException {
- this.crawlDatum = CrawlDatum.read(in);
- this.content = in.readBoolean() ? Content.read(in) : null;
- this.parse = in.readBoolean() ? ParseImpl.read(in) : null;
- }
-
- public final void write(DataOutput out) throws IOException {
- crawlDatum.write(out);
-
- out.writeBoolean(content != null);
- if (content != null) {
- content.write(out);
- }
-
- out.writeBoolean(parse != null);
- if (parse != null) {
- parse.write(out);
- }
- }
-
- public CrawlDatum getCrawlDatum() { return crawlDatum; }
- public Content getContent() { return content; }
- public ParseImpl getParse() { return parse; }
-
- public boolean equals(Object o) {
- if (!(o instanceof FetcherOutput))
- return false;
- FetcherOutput other = (FetcherOutput)o;
- return
- this.crawlDatum.equals(other.crawlDatum) &&
- this.content.equals(other.content);
- }
-
- public String toString() {
- StringBuffer buffer = new StringBuffer();
- buffer.append("CrawlDatum: " + crawlDatum+"\n" );
- return buffer.toString();
- }
+ public final void readFields(DataInput in) throws IOException { }
+
+ public final void write(DataOutput out) throws IOException { }
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,663 @@
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.nutch.crawl.CrawlDatumHbase;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.util.LogUtil;
+import org.apache.nutch.util.URLUtil;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class FetcherReducer
+extends TableReducer<ImmutableBytesWritable, FetchEntry, ImmutableBytesWritable> {
+
+ public static final Log LOG = Fetcher.LOG;
+
+ private final AtomicInteger activeThreads = new AtomicInteger(0);
+ private final AtomicInteger spinWaiting = new AtomicInteger(0);
+
+ private final long start = System.currentTimeMillis(); // start time of fetcher run
+ private final AtomicLong lastRequestStart = new AtomicLong(start);
+
+ private final AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+ private final AtomicInteger pages = new AtomicInteger(0); // total pages fetched
+ private final AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+ private QueueFeeder feeder;
+
+ private List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
+
+ private FetchItemQueues fetchQueues;
+
+ private static final ImmutableBytesWritable REDUCE_KEY =
+ new ImmutableBytesWritable(TableUtil.YES_VAL);
+
+ /**
+ * This class described the item to be fetched.
+ */
+ private static class FetchItem {
+ WebTableRow row;
+ String queueID;
+ String url;
+ URL u;
+
+ public FetchItem(String url, WebTableRow row, URL u, String queueID) {
+ this.row = row;
+ this.url = url;
+ this.u = u;
+ this.queueID = queueID;
+ }
+
+ /** Create an item. Queue id will be created based on <code>byIP</code>
+ * argument, either as a protocol + hostname pair, or protocol + IP
+ * address pair.
+ */
+ public static FetchItem create(String url, WebTableRow row, boolean byIP) {
+ String queueID;
+ URL u = null;
+ try {
+ u = new URL(url);
+ } catch (final Exception e) {
+ LOG.warn("Cannot parse url: " + url, e);
+ return null;
+ }
+ final String proto = u.getProtocol().toLowerCase();
+ String host;
+ if (byIP) {
+ try {
+ final InetAddress addr = InetAddress.getByName(u.getHost());
+ host = addr.getHostAddress();
+ } catch (final UnknownHostException e) {
+ // unable to resolve it, so don't fall back to host name
+ LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+ return null;
+ }
+ } else {
+ host = u.getHost();
+ if (host == null) {
+ LOG.warn("Unknown host for url: " + url + ", skipping.");
+ return null;
+ }
+ host = host.toLowerCase();
+ }
+ queueID = proto + "://" + host;
+ return new FetchItem(url, row, u, queueID);
+ }
+
+ }
+
+ /**
+ * This class handles FetchItems which come from the same host ID (be it
+ * a proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+ private static class FetchItemQueue {
+ List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
+ Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
+ AtomicLong nextFetchTime = new AtomicLong();
+ long crawlDelay;
+ long minCrawlDelay;
+ int maxThreads;
+
+ public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+ this.maxThreads = maxThreads;
+ this.crawlDelay = crawlDelay;
+ this.minCrawlDelay = minCrawlDelay;
+ // ready to start
+ setEndTime(System.currentTimeMillis() - crawlDelay);
+ }
+
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ public int getInProgressSize() {
+ return inProgress.size();
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ if (it != null) {
+ inProgress.remove(it);
+ setEndTime(System.currentTimeMillis(), asap);
+ }
+ }
+
+ public void addFetchItem(FetchItem it) {
+ if (it == null) return;
+ queue.add(it);
+ }
+
+ @SuppressWarnings("unused")
+ public void addInProgressFetchItem(FetchItem it) {
+ if (it == null) return;
+ inProgress.add(it);
+ }
+
+ public FetchItem getFetchItem() {
+ if (inProgress.size() >= maxThreads) return null;
+ final long now = System.currentTimeMillis();
+ if (nextFetchTime.get() > now) return null;
+ FetchItem it = null;
+ if (queue.size() == 0) return null;
+ try {
+ it = queue.remove(0);
+ inProgress.add(it);
+ } catch (final Exception e) {
+ LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
+ }
+ return it;
+ }
+
+ public synchronized void dump() {
+ LOG.info(" maxThreads = " + maxThreads);
+ LOG.info(" inProgress = " + inProgress.size());
+ LOG.info(" crawlDelay = " + crawlDelay);
+ LOG.info(" minCrawlDelay = " + minCrawlDelay);
+ LOG.info(" nextFetchTime = " + nextFetchTime.get());
+ LOG.info(" now = " + System.currentTimeMillis());
+ for (int i = 0; i < queue.size(); i++) {
+ final FetchItem it = queue.get(i);
+ LOG.info(" " + i + ". " + it.url);
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ setEndTime(endTime, false);
+ }
+
+ private void setEndTime(long endTime, boolean asap) {
+ if (!asap)
+ nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+ else
+ nextFetchTime.set(endTime);
+ }
+ }
+
+ /**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+ private static class FetchItemQueues {
+ @SuppressWarnings("unused")
+ public static final String DEFAULT_ID = "default";
+ Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+ AtomicInteger totalSize = new AtomicInteger(0);
+ int maxThreads;
+ boolean byIP;
+ long crawlDelay;
+ long minCrawlDelay;
+ Configuration conf;
+
+ public FetchItemQueues(Configuration conf) {
+ this.conf = conf;
+ this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
+ // backward-compatible default setting
+ this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+ this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+ this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+ }
+
+ public int getTotalSize() {
+ return totalSize.get();
+ }
+
+ public int getQueueCount() {
+ return queues.size();
+ }
+
+ public void addFetchItem(String url, WebTableRow row) {
+ final FetchItem it = FetchItem.create(url, row, byIP);
+ if (it != null) addFetchItem(it);
+ }
+
+ public synchronized void addFetchItem(FetchItem it) {
+ final FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+ fiq.addFetchItem(it);
+ totalSize.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it) {
+ finishFetchItem(it, false);
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ final FetchItemQueue fiq = queues.get(it.queueID);
+ if (fiq == null) {
+ LOG.warn("Attempting to finish item from unknown queue: " + it);
+ return;
+ }
+ fiq.finishFetchItem(it, asap);
+ }
+
+ public synchronized FetchItemQueue getFetchItemQueue(String id) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq == null) {
+ // initialize queue
+ fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ queues.put(id, fiq);
+ }
+ return fiq;
+ }
+
+ public synchronized FetchItem getFetchItem() {
+ final Iterator<Map.Entry<String, FetchItemQueue>> it =
+ queues.entrySet().iterator();
+ while (it.hasNext()) {
+ final FetchItemQueue fiq = it.next().getValue();
+ // reap empty queues
+ if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+ it.remove();
+ continue;
+ }
+ final FetchItem fit = fiq.getFetchItem();
+ if (fit != null) {
+ totalSize.decrementAndGet();
+
+ return fit;
+ }
+ }
+ return null;
+ }
+
+ public synchronized void dump() {
+ for (final String id : queues.keySet()) {
+ final FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0) continue;
+ LOG.info("* queue: " + id);
+ fiq.dump();
+ }
+ }
+ }
+
+ /**
+ * This class picks items from queues and fetches the pages.
+ */
+ private class FetcherThread extends Thread {
+ private final URLFilters urlFilters;
+ private final URLNormalizers normalizers;
+ private final ProtocolFactory protocolFactory;
+ private final long maxCrawlDelay;
+ @SuppressWarnings("unused")
+ private final boolean byIP;
+ private final int maxRedirect;
+ private String reprUrl;
+ private boolean redirecting;
+ private int redirectCount;
+ private Context context;
+
+ public FetcherThread(Context context, int num) {
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread" + num); // use an informative name
+ this.context = context;
+ Configuration conf = context.getConfiguration();
+ this.urlFilters = new URLFilters(conf);
+ this.protocolFactory = new ProtocolFactory(conf);
+ this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+ this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+ // backward-compatible default setting
+ this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
+ this.maxRedirect = conf.getInt("http.redirect.max", 3);
+ }
+
+ @Override
+ public void run() {
+ activeThreads.incrementAndGet(); // count threads
+
+ FetchItem fit = null;
+ try {
+
+ while (true) {
+ fit = fetchQueues.getFetchItem();
+ if (fit == null) {
+ if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + " fetchQueues.getFetchItem() was null, spin-waiting ...");
+ }
+ // spin-wait.
+ spinWaiting.incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (final Exception e) {}
+ spinWaiting.decrementAndGet();
+ continue;
+ } else {
+ // all done, finish this thread
+ return;
+ }
+ }
+ lastRequestStart.set(System.currentTimeMillis());
+ if (!fit.row.hasColumn(WebTableColumns.REPR_URL, null)) {
+ reprUrl = fit.url.toString();
+ } else {
+ reprUrl = fit.row.getReprUrl();
+ }
+ try {
+ LOG.info("fetching " + fit.url);
+
+ // fetch the page
+ redirecting = false;
+ redirectCount = 0;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("redirectCount=" + redirectCount);
+ }
+ redirecting = false;
+ final Protocol protocol = this.protocolFactory.getProtocol(fit.url);
+ final RobotRules rules = protocol.getRobotRules(fit.url, fit.row);
+ if (!rules.isAllowed(fit.u)) {
+ // unblock
+ fetchQueues.finishFetchItem(fit, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Denied by robots.txt: " + fit.url);
+ }
+ output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatumHbase.STATUS_GONE);
+ continue;
+ }
+ if (rules.getCrawlDelay() > 0) {
+ if (rules.getCrawlDelay() > maxCrawlDelay) {
+ // unblock
+ fetchQueues.finishFetchItem(fit, true);
+ LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
+ output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatumHbase.STATUS_GONE);
+ continue;
+ } else {
+ final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+ fiq.crawlDelay = rules.getCrawlDelay();
+ }
+ }
+ final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.row);
+ final ProtocolStatus status = output.getStatus();
+ final Content content = output.getContent();
+ // unblock queue
+ fetchQueues.finishFetchItem(fit);
+
+ switch(status.getCode()) {
+
+ case ProtocolStatus.WOULDBLOCK:
+ // retry ?
+ fetchQueues.addFetchItem(fit);
+ break;
+
+ case ProtocolStatus.SUCCESS: // got a page
+ output(fit, content, status, CrawlDatumHbase.STATUS_FETCHED);
+ updateStatus(content.getContent().length);
+ break;
+
+ case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.TEMP_MOVED:
+ byte code;
+ boolean temp;
+ if (status.getCode() == ProtocolStatus.MOVED) {
+ code = CrawlDatumHbase.STATUS_REDIR_PERM;
+ temp = false;
+ } else {
+ code = CrawlDatumHbase.STATUS_REDIR_TEMP;
+ temp = true;
+ }
+ output(fit, content, status, code);
+ final String newUrl = status.getMessage();
+ handleRedirect(fit.url, newUrl, temp, Fetcher.PROTOCOL_REDIR);
+ redirecting = false;
+ break;
+ case ProtocolStatus.EXCEPTION:
+ logError(fit.url, status.getMessage());
+ /* FALLTHROUGH */
+ case ProtocolStatus.RETRY: // retry
+ case ProtocolStatus.BLOCKED:
+ output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+ break;
+
+ case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.NOTFOUND:
+ case ProtocolStatus.ACCESS_DENIED:
+ case ProtocolStatus.ROBOTS_DENIED:
+ output(fit, null, status, CrawlDatumHbase.STATUS_GONE);
+ break;
+
+ case ProtocolStatus.NOTMODIFIED:
+ output(fit, null, status, CrawlDatumHbase.STATUS_NOTMODIFIED);
+ break;
+
+ default:
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+ }
+ output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+ }
+
+ if (redirecting && redirectCount >= maxRedirect) {
+ fetchQueues.finishFetchItem(fit);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(" - redirect count exceeded " + fit.url);
+ }
+ output(fit, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatumHbase.STATUS_GONE);
+ }
+
+ } while (redirecting && (redirectCount < maxRedirect));
+
+ } catch (final Throwable t) { // unexpected exception
+ // unblock
+ fetchQueues.finishFetchItem(fit);
+ t.printStackTrace();
+ logError(fit.url, t.toString());
+ output(fit, null, ProtocolStatus.STATUS_FAILED, CrawlDatumHbase.STATUS_RETRY);
+ }
+ }
+
+ } catch (final Throwable e) {
+ if (LOG.isFatalEnabled()) {
+ e.printStackTrace(LogUtil.getFatalStream(LOG));
+ LOG.fatal("fetcher caught:"+e.toString());
+ }
+ } finally {
+ if (fit != null) fetchQueues.finishFetchItem(fit);
+ activeThreads.decrementAndGet(); // count threads
+ LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+ }
+ }
+
+ private void handleRedirect(String url, String newUrl,
+ boolean temp, String redirType)
+ throws URLFilterException, IOException, InterruptedException {
+ newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+ newUrl = urlFilters.filter(newUrl);
+ if (newUrl == null || newUrl.equals(url)) {
+ return;
+ }
+ reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+ final String reversedNewUrl = TableUtil.reverseUrl(newUrl);
+ // TODO: Find a way to use MutablWebTableRow here
+ Put put = new Put(Bytes.toBytes(reversedNewUrl));
+ if (!reprUrl.equals(url)) {
+ put.add(WebTableColumns.REPR_URL, null, Bytes.toBytes(reprUrl));
+ }
+ put.add(WebTableColumns.METADATA,
+ Fetcher.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
+ context.write(REDUCE_KEY, put);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect to " +
+ reprUrl + " (fetching later)");
+ }
+
+ }
+
+ private void updateStatus(int bytesInPage) throws IOException {
+ pages.incrementAndGet();
+ bytes.addAndGet(bytesInPage);
+ }
+
+ private void output(FetchItem fit, Content content,
+ ProtocolStatus pstatus, byte status)
+ throws IOException, InterruptedException {
+ fit.row.setStatus(status);
+ final long prevFetchTime = fit.row.getFetchTime();
+ fit.row.setPrevFetchTime(prevFetchTime);
+ fit.row.setFetchTime(System.currentTimeMillis());
+ if (pstatus != null) {
+ fit.row.setProtocolStatus(pstatus);
+ }
+
+ if (content != null) {
+ fit.row.setContent(content.getContent());
+ fit.row.setContentType(content.getContentType());
+ fit.row.setBaseUrl(content.getBaseUrl());
+ }
+ fit.row.putMeta(Fetcher.FETCH_MARK, TableUtil.YES_VAL);
+ fit.row.makeRowMutation().writeToContext(REDUCE_KEY, context);
+ }
+
+ private void logError(String url, String message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetch of " + url + " failed with: " + message);
+ }
+ errors.incrementAndGet();
+ }
+ }
+
+ /**
+ * This class feeds the queues with input items, and re-fills them as
+ * items are consumed by FetcherThread-s.
+ */
+ private static class QueueFeeder extends Thread {
+ private final Context context;
+ private final FetchItemQueues queues;
+ private final int size;
+ private Iterator<FetchEntry> currentIter;
+ boolean hasMore;
+
+ public QueueFeeder(Context context,
+ FetchItemQueues queues, int size)
+ throws IOException, InterruptedException {
+ this.context = context;
+ this.queues = queues;
+ this.size = size;
+ this.setDaemon(true);
+ this.setName("QueueFeeder");
+ hasMore = context.nextKey();
+ if (hasMore) {
+ currentIter = context.getValues().iterator();
+ }
+ }
+
+ @Override
+ public void run() {
+ int cnt = 0;
+ try {
+ while (hasMore) {
+ int feed = size - queues.getTotalSize();
+ if (feed <= 0) {
+ // queues are full - spin-wait until they have some free space
+ try {
+ Thread.sleep(1000);
+ } catch (final Exception e) {};
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("-feeding " + feed + " input urls ...");
+ }
+ while (feed > 0 && currentIter.hasNext()) {
+ FetchEntry entry = new FetchEntry();
+ // since currentIter.next() reuses the same
+ // FetchEntry object we need to clone it
+ Writables.copyWritable(currentIter.next(), entry);
+ WebTableRow row = new WebTableRow(entry.getRow());
+ final String url =
+ TableUtil.unreverseUrl(Bytes.toString(entry.getKey().get()));
+ queues.addFetchItem(url, row);
+ feed--;
+ cnt++;
+ }
+ if (currentIter.hasNext()) {
+ continue; // finish items in current list before reading next key
+ }
+ hasMore = context.nextKey();
+ if (hasMore) {
+ currentIter = context.getValues().iterator();
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ }
+ LOG.info("QueueFeeder finished: total " + cnt + " records.");
+ }
+ }
+
+ @Override
+ public void run(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ this.fetchQueues = new FetchItemQueues(conf);
+ final int threadCount = conf.getInt("fetcher.threads.fetch", 10);
+ LOG.info("FetcherHbase: threads: " + threadCount);
+
+ // set non-blocking & no-robots mode for HTTP protocol plugins.
+ conf.setBoolean(Protocol.CHECK_BLOCKING, false);
+ conf.setBoolean(Protocol.CHECK_ROBOTS, false);
+
+ feeder = new QueueFeeder(context, fetchQueues, threadCount * 50);
+ feeder.start();
+
+ for (int i = 0; i < threadCount; i++) { // spawn threads
+ FetcherThread ft = new FetcherThread(context, i);
+ fetcherThreads.add(ft);
+ ft.start();
+ }
+ // select a timeout that avoids a task timeout
+ final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2;
+
+ do { // wait for threads to exit
+ try {
+ Thread.sleep(10000);
+ } catch (final InterruptedException e) {}
+
+ context.progress();
+ LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+ + ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+
+ if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+ fetchQueues.dump();
+ }
+ // some requests seem to hang, despite all intentions
+ if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+ LOG.warn("Aborting with " + activeThreads + " hung threads.");
+ return;
+ }
+
+ } while (activeThreads.get() > 0);
+ LOG.info("-activeThreads=" + activeThreads);
+ }
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,23 @@
+package org.apache.nutch.fetcher;
+
+import java.net.URL;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+public class PartitionUrlByHost
+extends Partitioner<ImmutableBytesWritable, FetchEntry> {
+
+ @Override
+ public int getPartition(ImmutableBytesWritable key,
+ FetchEntry value, int numPartitions) {
+ String urlString = Bytes.toString(value.getKey().get());
+
+ URL url = null;
+
+ int hashCode = (url==null ? urlString : url.getHost()).hashCode();
+
+ return (hashCode & Integer.MAX_VALUE) % numPartitions;
+ }
+}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java Sun Aug 16 22:25:12 2009
@@ -31,7 +31,7 @@
import org.apache.hadoop.util.*;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchJobConf;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.document.DateTools;
@@ -424,7 +424,7 @@
new Path("dedup-urls-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- JobConf job = new NutchJob(getConf());
+ JobConf job = new NutchJobConf(getConf());
for (int i = 0; i < indexDirs.length; i++) {
if (LOG.isInfoEnabled()) {
@@ -450,7 +450,7 @@
Path outDir2 =
new Path("dedup-hash-"+
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
- job = new NutchJob(getConf());
+ job = new NutchJobConf(getConf());
job.setJobName("dedup 2: content by hash");
FileInputFormat.addInputPath(job, outDir1);
@@ -472,7 +472,7 @@
// remove outDir1 - no longer needed
fs.delete(outDir1, true);
- job = new NutchJob(getConf());
+ job = new NutchJobConf(getConf());
job.setJobName("dedup 3: delete from index(es)");
FileInputFormat.addInputPath(job, outDir2);
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java Sun Aug 16 22:25:12 2009
@@ -1,104 +1,161 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.nutch.indexer;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.ValueFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.indexer.lucene.LuceneWriter;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
-
-/** Create indexes for segments. */
-public class Indexer extends Configured implements Tool {
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class Indexer
+extends TableMapper<ImmutableBytesWritable, WebTableRow>
+implements Tool {
public static final String DONE_NAME = "index.done";
-
+
public static final Log LOG = LogFactory.getLog(Indexer.class);
- public Indexer() {
- super(null);
+ private static final Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ public static final byte[] INDEX_MARK = Bytes.toBytes("__idxmrk__");
+
+ private Configuration conf;
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.SIGNATURE));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.PARSE_STATUS));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.SCORE));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, INDEX_MARK));
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result result, Context context)
+ throws IOException, InterruptedException {
+ WebTableRow row = new WebTableRow(result);
+
+ ParseStatus pstatus = row.getParseStatus();
+ if (!pstatus.isSuccess() ||
+ pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ return; // filter urls not parsed
+ }
+
+ context.write(key, row);
}
- public Indexer(Configuration conf) {
- super(conf);
+ public static Collection<HbaseColumn> getColumns(Job job) {
+ Configuration conf = job.getConfiguration();
+ Collection<HbaseColumn> columns = new HashSet<HbaseColumn>(COLUMNS);
+ IndexingFilters filters = new IndexingFilters(conf);
+ columns.addAll(filters.getColumns());
+ ScoringFilters scoringFilters = new ScoringFilters(conf);
+ columns.addAll(scoringFilters.getColumns());
+ return columns;
+ }
+
+ public static Job createIndexJob(Configuration conf, String jobName,
+ String table, boolean reindex) throws IOException {
+ Job job = new NutchJob(conf, jobName);
+ Scan scan = TableUtil.createScanFromColumns(getColumns(job));
+ List<Filter> filters = new ArrayList<Filter>();
+ if (!reindex) {
+ filters.add(
+ new ValueFilter(WebTableColumns.METADATA, INDEX_MARK,
+ CompareOp.NOT_EQUAL, TableUtil.YES_VAL, false));
+ }
+ filters.add(
+ new ValueFilter(WebTableColumns.PARSE_STATUS, null,
+ CompareOp.GREATER_OR_EQUAL, new byte[] { (byte)0 }, true));
+ FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filters);
+ scan.setFilter(filterList);
+ TableMapReduceUtil.initTableMapperJob(table,
+ scan,
+ Indexer.class, ImmutableBytesWritable.class,
+ WebTableRow.class, job);
+
+ job.setReducerClass(IndexerReducer.class);
+ job.setOutputFormatClass(IndexerOutputFormat.class);
+ return job;
}
- public void index(Path luceneDir, Path crawlDb,
- Path linkDb, List<Path> segments)
- throws IOException {
- LOG.info("Indexer: starting");
+ private void index(Path indexDir, String table, boolean reindex) throws Exception {
+ LOG.info("IndexerHbase: starting");
+ LOG.info("IndexerHbase: table: " + table);
- final JobConf job = new NutchJob(getConf());
- job.setJobName("index-lucene " + luceneDir);
+ LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
+ LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
+ LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
- IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
+ NutchIndexWriterFactory.addClassToConf(getConf(), LuceneWriter.class);
- FileOutputFormat.setOutputPath(job, luceneDir);
+ Job job = createIndexJob(getConf(), "index " + table, table, reindex);
- LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
- LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
- LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
+ FileOutputFormat.setOutputPath(job, indexDir);
- NutchIndexWriterFactory.addClassToConf(job, LuceneWriter.class);
+ job.waitForCompletion(true);
+ FileOutputFormat.setOutputPath(job, indexDir);
- JobClient.runJob(job);
- LOG.info("Indexer: done");
+ job.waitForCompletion(true);
+ LOG.info("IndexerHbase: done");
}
public int run(String[] args) throws Exception {
- if (args.length < 4) {
- System.err.println("Usage: Indexer <index> <crawldb> <linkdb> <segment> ...");
- return -1;
- }
+ String usage = "Usage: IndexerHbase <index> <webtable> [-reindex]";
- final Path luceneDir = new Path(args[0]);
- final Path crawlDb = new Path(args[1]);
- final Path linkDb = new Path(args[2]);
-
- final List<Path> segments = new ArrayList<Path>();
- for (int i = 3; i < args.length; i++) {
- segments.add(new Path(args[i]));
+ if (args.length < 2) {
+ System.err.println(usage);
+ System.exit(-1);
}
-
- try {
- index(luceneDir, crawlDb, linkDb, segments);
- return 0;
- } catch (final Exception e) {
- LOG.fatal("Indexer: " + StringUtils.stringifyException(e));
- return -1;
+
+ boolean reindex = false;
+ if (args.length >= 3 && "-reindex".equals(args[2])) {
+ reindex = true;
}
+
+ index(new Path(args[0]), args[1], reindex);
+ return 0;
}
public static void main(String[] args) throws Exception {
- final int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
System.exit(res);
}
+
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Sun Aug 16 22:25:12 2009
@@ -18,41 +18,41 @@
import java.io.IOException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.nutch.indexer.NutchDocument;
-public class IndexerOutputFormat extends FileOutputFormat<Text, NutchDocument> {
+public class IndexerOutputFormat
+extends FileOutputFormat<WritableComparable<?>, NutchDocument> {
@Override
- public RecordWriter<Text, NutchDocument> getRecordWriter(FileSystem ignored,
- JobConf job, String name, Progressable progress) throws IOException {
-
- // populate JobConf with field indexing options
- IndexingFilters filters = new IndexingFilters(job);
-
+ public RecordWriter<WritableComparable<?>, NutchDocument> getRecordWriter(
+ TaskAttemptContext job) throws IOException, InterruptedException {
+
final NutchIndexWriter[] writers =
- NutchIndexWriterFactory.getNutchIndexWriters(job);
+ NutchIndexWriterFactory.getNutchIndexWriters(job.getConfiguration());
for (final NutchIndexWriter writer : writers) {
- writer.open(job, name);
+ writer.open(job, FileOutputFormat.getUniqueFile(job, "part", ""));
}
- return new RecordWriter<Text, NutchDocument>() {
- public void close(Reporter reporter) throws IOException {
+ return new RecordWriter<WritableComparable<?>, NutchDocument>() {
+
+ @Override
+ public void write(WritableComparable<?> key, NutchDocument doc) throws IOException {
for (final NutchIndexWriter writer : writers) {
- writer.close();
+ writer.write(doc);
}
}
- public void write(Text key, NutchDocument doc) throws IOException {
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
for (final NutchIndexWriter writer : writers) {
- writer.write(doc);
- }
+ writer.close();
+ }
}
};
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,87 @@
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class IndexerReducer
+extends Reducer<ImmutableBytesWritable, WebTableRow, ImmutableBytesWritable, NutchDocument> {
+
+ public static final Log LOG = Indexer.LOG;
+
+ private IndexingFilters filters;
+
+ private ScoringFilters scoringFilters;
+
+ private HTable table;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ filters = new IndexingFilters(conf);
+ table = new HTable(conf.get(TableInputFormat.INPUT_TABLE));
+ scoringFilters = new ScoringFilters(conf);
+ }
+
+ @Override
+ protected void reduce(ImmutableBytesWritable key, Iterable<WebTableRow> values,
+ Context context) throws IOException, InterruptedException {
+ WebTableRow row = values.iterator().next();
+ NutchDocument doc = new NutchDocument();
+
+ doc.add("id", Bytes.toString(key.get()));
+ doc.add("digest", StringUtil.toHexString(row.getSignature()));
+
+ String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Indexing URL: " + url);
+ }
+
+ try {
+ doc = filters.filter(doc, url, row);
+ } catch (IndexingException e) {
+ LOG.warn("Error indexing "+key+": "+e);
+ return;
+ }
+
+ // skip documents discarded by indexing filters
+ if (doc == null) return;
+
+ float boost = 1.0f;
+ // run scoring filters
+ try {
+ boost = scoringFilters.indexerScore(url, doc, row, boost);
+ } catch (final ScoringFilterException e) {
+ LOG.warn("Error calculating score " + key + ": " + e);
+ return;
+ }
+
+ doc.setScore(boost);
+ // store boost for use by explain and dedup
+ doc.add("boost", Float.toString(boost));
+
+ row.putMeta(Indexer.INDEX_MARK, TableUtil.YES_VAL);
+ row.makeRowMutation().commit(table);
+ context.write(key, doc);
+ }
+
+ @Override
+ public void cleanup(Context context) throws IOException {
+ table.close();
+ }
+
+}