You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/08/17 00:25:17 UTC
svn commit: r804789 [2/6] - in /lucene/nutch/branches/nutchbase: ./ bin/
conf/ lib/ src/java/org/apache/nutch/analysis/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/ind...
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java Sun Aug 16 22:25:12 2009
@@ -1,626 +1,185 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.nutch.crawl;
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.text.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
-// Commons Logging imports
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.URLFilterException;
-import org.apache.nutch.net.URLFilters;
-import org.apache.nutch.net.URLNormalizers;
-import org.apache.nutch.scoring.ScoringFilterException;
-import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.LockUtil;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
-
-/** Generates a subset of a crawl db to fetch. */
-public class Generator extends Configured implements Tool {
-
+import org.apache.nutch.util.URLUtil;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class Generator
+extends Configured
+implements Tool {
public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
- public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
- public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
public static final String CRAWL_TOP_N = "crawl.topN";
public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime";
- public static final String CRAWL_GEN_DELAY = "crawl.gen.delay";
- public static final Log LOG = LogFactory.getLog(Generator.class);
+ public static final String CRAWL_RANDOM_SEED = "generate.partition.seed";
- public static class SelectorEntry implements Writable {
- public Text url;
- public CrawlDatum datum;
-
- public SelectorEntry() {
- url = new Text();
- datum = new CrawlDatum();
- }
-
- public void readFields(DataInput in) throws IOException {
- url.readFields(in);
- datum.readFields(in);
- }
-
- public void write(DataOutput out) throws IOException {
- url.write(out);
- datum.write(out);
- }
-
- public String toString() {
- return "url=" + url.toString() + ", datum=" + datum.toString();
- }
+ private static final Set<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_TIME));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.SCORE));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
}
-
- /** Selects entries due for fetch. */
- public static class Selector implements Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
- private LongWritable genTime = new LongWritable(System.currentTimeMillis());
- private long curTime;
- private long limit;
- private long count;
- private HashMap<String, IntWritable> hostCounts =
- new HashMap<String, IntWritable>();
- private int maxPerHost;
- private HashSet<String> maxedHosts = new HashSet<String>();
- private HashSet<String> dnsFailureHosts = new HashSet<String>();
- private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();
- private URLFilters filters;
- private URLNormalizers normalizers;
- private ScoringFilters scfilters;
- private SelectorEntry entry = new SelectorEntry();
- private FloatWritable sortValue = new FloatWritable();
- private boolean byIP;
- private long dnsFailure = 0L;
- private boolean filter;
- private long genDelay;
- private FetchSchedule schedule;
-
- public void configure(JobConf job) {
- curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis());
- limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks();
- maxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
- byIP = job.getBoolean(GENERATE_MAX_PER_HOST_BY_IP, false);
- filters = new URLFilters(job);
- normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
- scfilters = new ScoringFilters(job);
- hostPartitioner.configure(job);
- filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
- genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L;
- long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
- if (time > 0) genTime.set(time);
- schedule = FetchScheduleFactory.getFetchSchedule(job);
- }
-
- public void close() {}
-
- /** Select & invert subset due for fetch. */
- public void map(Text key, CrawlDatum value,
- OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
- throws IOException {
- Text url = key;
- if (filter) {
- // If filtering is on don't generate URLs that don't pass URLFilters
- try {
- if (filters.filter(url.toString()) == null)
- return;
- } catch (URLFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
- + ")");
- }
- }
- }
- CrawlDatum crawlDatum = value;
-
- // check fetch schedule
- if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
- LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime);
- return;
- }
-
- LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
- if (oldGenTime != null) { // awaiting fetch & update
- if (oldGenTime.get() + genDelay > curTime) // still wait for update
- return;
- }
- float sort = 1.0f;
- try {
- sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
- } catch (ScoringFilterException sfe) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
- }
- }
- // sort by decreasing score, using DecreasingFloatComparator
- sortValue.set(sort);
- // record generation time
- crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
- entry.datum = crawlDatum;
- entry.url = (Text)key;
- output.collect(sortValue, entry); // invert for sort by score
- }
-
- /** Partition by host. */
- public int getPartition(FloatWritable key, Writable value,
- int numReduceTasks) {
- return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
- numReduceTasks);
- }
-
- /** Collect until limit is reached. */
- public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
- OutputCollector<FloatWritable, SelectorEntry> output,
- Reporter reporter)
- throws IOException {
-
- while (values.hasNext() && count < limit) {
-
- SelectorEntry entry = values.next();
- Text url = entry.url;
- String urlString = url.toString();
- URL u = null;
-
- // skip bad urls, including empty and null urls
- try {
- u = new URL(url.toString());
- } catch (MalformedURLException e) {
- LOG.info("Bad protocol in url: " + url.toString());
- continue;
- }
-
- String host = u.getHost();
- host = host.toLowerCase();
- String hostname = host;
-
- // partitioning by ip will generate lots of DNS requests here, and will
- // be up to double the overall dns load, do not run this way unless you
- // are running a local caching DNS server or a two layer DNS cache
- if (byIP) {
- if (maxedHosts.contains(host)) {
- if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + host); }
- continue;
- }
- if (dnsFailureHosts.contains(host)) {
- if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already failed: " + host); }
- continue;
- }
- try {
- InetAddress ia = InetAddress.getByName(host);
- host = ia.getHostAddress();
- urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString();
- }
- catch (UnknownHostException uhe) {
- // remember hostnames that could not be looked up
- dnsFailureHosts.add(hostname);
- if (LOG.isDebugEnabled()) {
- LOG.debug("DNS lookup failed: " + host + ", skipping.");
- }
- dnsFailure++;
- if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) {
- LOG.warn("DNS failures: " + dnsFailure);
- }
- continue;
- }
- }
-
- try {
- urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
- host = new URL(urlString).getHost();
- } catch (Exception e) {
- LOG.warn("Malformed URL: '" + urlString + "', skipping (" +
- StringUtils.stringifyException(e) + ")");
- continue;
- }
-
- // only filter if we are counting hosts
- if (maxPerHost > 0) {
-
- IntWritable hostCount = hostCounts.get(host);
- if (hostCount == null) {
- hostCount = new IntWritable();
- hostCounts.put(host, hostCount);
- }
- // increment hostCount
- hostCount.set(hostCount.get() + 1);
+ public static final byte[] GENERATOR_MARK =
+ Bytes.toBytes("__genmrk__");
- // skip URL if above the limit per host.
- if (hostCount.get() > maxPerHost) {
- if (hostCount.get() == maxPerHost + 1) {
- // remember the raw hostname that is maxed out
- maxedHosts.add(hostname);
- if (LOG.isInfoEnabled()) {
- LOG.info("Host " + host + " has more than " + maxPerHost +
- " URLs." + " Skipping additional.");
- }
- }
- continue;
- }
- }
-
- output.collect(key, entry);
-
- // Count is incremented only when we keep the URL
- // maxPerHost may cause us to skip it.
- count++;
- }
- }
- }
+ public static final Log LOG = LogFactory.getLog(Generator.class);
- public static class DecreasingFloatComparator extends FloatWritable.Comparator {
+ public static class SelectorEntry
+ implements WritableComparable<SelectorEntry> {
- /** Compares two FloatWritables decreasing. */
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- return super.compare(b2, s2, l2, b1, s1, l1);
+ String url;
+ String host;
+ float score;
+
+ public SelectorEntry() { }
+
+ public SelectorEntry(String url, float score) {
+ this.url = url;
+ this.host = URLUtil.getHost(url);
+ this.score = score;
}
- }
- public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+ public void readFields(DataInput in) throws IOException {
+ url = Text.readString(in);
+ host = Text.readString(in);
+ score = in.readFloat();
+ }
- public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException {
- SelectorEntry entry = (SelectorEntry)value;
- output.collect(entry.url, entry);
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, url);
+ Text.writeString(out, host);
+ out.writeFloat(score);
}
- }
-
- public static class PartitionReducer extends MapReduceBase
- implements Reducer<Text, SelectorEntry, Text, CrawlDatum> {
- public void reduce(Text key, Iterator<SelectorEntry> values,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
- // if using HashComparator, we get only one input key in case of hash collision
- // so use only URLs from values
- while (values.hasNext()) {
- SelectorEntry entry = values.next();
- output.collect(entry.url, entry.datum);
- }
+ public int compareTo(SelectorEntry se) {
+ if (se.score > score)
+ return 1;
+ else if (se.score == score)
+ return url.compareTo(se.url);
+ return -1;
}
- }
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + url.hashCode();
+ result = prime * result + Float.floatToIntBits(score);
+ return result;
+ }
- /** Sort fetch lists by hash of URL. */
- public static class HashComparator extends WritableComparator {
- public HashComparator() {
- super(Text.class);
- }
-
- public int compare(WritableComparable a, WritableComparable b) {
- Text url1 = (Text) a;
- Text url2 = (Text) b;
- int hash1 = hash(url1.getBytes(), 0, url1.getLength());
- int hash2 = hash(url2.getBytes(), 0, url2.getLength());
- return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
- }
-
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- int hash1 = hash(b1, s1, l1);
- int hash2 = hash(b2, s2, l2);
- return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
- }
-
- private static int hash(byte[] bytes, int start, int length) {
- int hash = 1;
- // make later bytes more significant in hash code, so that sorting by
- // hashcode correlates less with by-host ordering.
- for (int i = length - 1; i >= 0; i--)
- hash = (31 * hash) + (int) bytes[start + i];
- return hash;
+ @Override
+ public boolean equals(Object obj) {
+ SelectorEntry other = (SelectorEntry) obj;
+ if (!url.equals(other.url))
+ return false;
+ if (Float.floatToIntBits(score) != Float.floatToIntBits(other.score))
+ return false;
+ return true;
}
}
- /**
- * Update the CrawlDB so that the next generate won't include the same URLs.
- */
- public static class CrawlDbUpdater extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
- long generateTime;
-
- public void configure(JobConf job) {
- generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+ public static class SelectorEntryComparator extends WritableComparator {
+ public SelectorEntryComparator() {
+ super(SelectorEntry.class, true);
}
-
- public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
- if (key instanceof FloatWritable) { // tempDir source
- SelectorEntry se = (SelectorEntry)value;
- output.collect(se.url, se.datum);
- } else {
- output.collect((Text)key, (CrawlDatum)value);
- }
- }
- private CrawlDatum orig = new CrawlDatum();
- private LongWritable genTime = new LongWritable(0L);
-
- public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
- while (values.hasNext()) {
- CrawlDatum val = values.next();
- if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
- LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
- genTime.set(gt.get());
- if (genTime.get() != generateTime) {
- orig.set(val);
- genTime.set(0L);
- continue;
- }
- } else {
- orig.set(val);
- }
- }
- if (genTime.get() != 0L) {
- orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
- }
- output.collect(key, orig);
- }
}
- public Generator() {}
-
- public Generator(Configuration conf) {
- setConf(conf);
+ static {
+ WritableComparator.define(SelectorEntry.class,
+ new SelectorEntryComparator());
}
/**
- * Generate fetchlists in a segment. Whether to filter URLs or not is
- * read from the crawl.generate.filter property in the configuration
- * files. If the property is not found, the URLs are filtered.
- *
- * @param dbDir Crawl database directory
- * @param segments Segments directory
- * @param numLists Number of reduce tasks
- * @param topN Number of top URLs to be selected
- * @param curTime Current time in milliseconds
- *
- * @return Path to generated segment or null if no entries were
- * selected
- *
- * @throws IOException When an I/O error occurs
- */
- public Path generate(Path dbDir, Path segments, int numLists,
- long topN, long curTime) throws IOException {
-
- JobConf job = new NutchJob(getConf());
- boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
- return generate(dbDir, segments, numLists, topN, curTime, filter, false);
- }
-
- /**
- * Generate fetchlists in a segment.
- * @return Path to generated segment or null if no entries were selected.
+ * Mark URLs ready for fetching.
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
* */
- public Path generate(Path dbDir, Path segments,
- int numLists, long topN, long curTime, boolean filter,
- boolean force)
- throws IOException {
-
- Path tempDir =
- new Path(getConf().get("mapred.temp.dir", ".") +
- "/generate-temp-"+ System.currentTimeMillis());
-
- Path segment = new Path(segments, generateSegmentName());
- Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
-
- Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
- FileSystem fs = FileSystem.get(getConf());
- LockUtil.createLockFile(fs, lock, force);
-
- LOG.info("Generator: Selecting best-scoring urls due for fetch.");
- LOG.info("Generator: starting");
- LOG.info("Generator: segment: " + segment);
- LOG.info("Generator: filtering: " + filter);
+ public void generate(String table, long topN, long curTime, boolean filter)
+ throws Exception {
+
+ LOG.info("GeneratorHbase: Selecting best-scoring urls due for fetch.");
+ LOG.info("GeneratorHbase: starting");
+ LOG.info("GeneratorHbase: filtering: " + filter);
if (topN != Long.MAX_VALUE) {
- LOG.info("Generator: topN: " + topN);
+ LOG.info("GeneratorHbase: topN: " + topN);
}
-
+
// map to inverted subset due for fetch, sort by score
- JobConf job = new NutchJob(getConf());
- job.setJobName("generate: select " + segment);
+ getConf().setLong(CRAWL_GEN_CUR_TIME, curTime);
+ getConf().setLong(CRAWL_TOP_N, topN);
+ getConf().setBoolean(CRAWL_GENERATE_FILTER, filter);
+ getConf().setInt(CRAWL_RANDOM_SEED, new Random().nextInt());
+
+ Job job = new NutchJob(getConf(), "generate-hbase: " + table);
+ job.setJobName("generate-hbase: " + table);
+ Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+ TableMapReduceUtil.initTableMapperJob(table, scan,
+ GeneratorMapper.class, SelectorEntry.class,
+ WebTableRow.class, job);
+ TableMapReduceUtil.initTableReducerJob(table, GeneratorReducer.class, job, PartitionSelectorByHost.class);
- if (numLists == -1) { // for politeness make
- numLists = job.getNumMapTasks(); // a partition per fetch task
- }
- if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
- // override
- LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
- numLists = 1;
- }
- job.setLong(CRAWL_GEN_CUR_TIME, curTime);
- // record real generation time
- long generateTime = System.currentTimeMillis();
- job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
- job.setLong(CRAWL_TOP_N, topN);
- job.setBoolean(CRAWL_GENERATE_FILTER, filter);
-
- FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
- job.setInputFormat(SequenceFileInputFormat.class);
-
- job.setMapperClass(Selector.class);
- job.setPartitionerClass(Selector.class);
- job.setReducerClass(Selector.class);
-
- FileOutputFormat.setOutputPath(job, tempDir);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(FloatWritable.class);
- job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
- job.setOutputValueClass(SelectorEntry.class);
- try {
- JobClient.runJob(job);
- } catch (IOException e) {
- LockUtil.removeLockFile(fs, lock);
- throw e;
- }
+ job.waitForCompletion(true);
- // check that we selected at least some entries ...
- SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir);
- boolean empty = true;
- if (readers != null && readers.length > 0) {
- for (int num = 0; num < readers.length; num++) {
- if (readers[num].next(new FloatWritable())) {
- empty = false;
- break;
- }
- }
- }
-
- for (int i = 0; i < readers.length; i++) readers[i].close();
-
- if (empty) {
- LOG.warn("Generator: 0 records selected for fetching, exiting ...");
- LockUtil.removeLockFile(fs, lock);
- fs.delete(tempDir, true);
- return null;
- }
-
- // invert again, paritition by host, sort by url hash
- if (LOG.isInfoEnabled()) {
- LOG.info("Generator: Partitioning selected urls by host, for politeness.");
- }
- job = new NutchJob(getConf());
- job.setJobName("generate: partition " + segment);
-
- job.setInt("partition.url.by.host.seed", new Random().nextInt());
-
- FileInputFormat.addInputPath(job, tempDir);
- job.setInputFormat(SequenceFileInputFormat.class);
-
- job.setMapperClass(SelectorInverseMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(SelectorEntry.class);
- job.setPartitionerClass(PartitionUrlByHost.class);
- job.setReducerClass(PartitionReducer.class);
- job.setNumReduceTasks(numLists);
-
- FileOutputFormat.setOutputPath(job, output);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(CrawlDatum.class);
- job.setOutputKeyComparatorClass(HashComparator.class);
- try {
- JobClient.runJob(job);
- } catch (IOException e) {
- LockUtil.removeLockFile(fs, lock);
- fs.delete(tempDir, true);
- throw e;
- }
- if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
- // update the db from tempDir
- Path tempDir2 =
- new Path(getConf().get("mapred.temp.dir", ".") +
- "/generate-temp-"+ System.currentTimeMillis());
-
- job = new NutchJob(getConf());
- job.setJobName("generate: updatedb " + dbDir);
- job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
- FileInputFormat.addInputPath(job, tempDir);
- FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setMapperClass(CrawlDbUpdater.class);
- job.setReducerClass(CrawlDbUpdater.class);
- job.setOutputFormat(MapFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(CrawlDatum.class);
- FileOutputFormat.setOutputPath(job, tempDir2);
- try {
- JobClient.runJob(job);
- CrawlDb.install(job, dbDir);
- } catch (IOException e) {
- LockUtil.removeLockFile(fs, lock);
- fs.delete(tempDir, true);
- fs.delete(tempDir2, true);
- throw e;
- }
- fs.delete(tempDir2, true);
- }
- LockUtil.removeLockFile(fs, lock);
- fs.delete(tempDir, true);
-
- if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
-
- return segment;
+ LOG.info("GeneratorHbase: done");
}
-
- private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
- public static synchronized String generateSegmentName() {
- try {
- Thread.sleep(1000);
- } catch (Throwable t) {};
- return sdf.format
- (new Date(System.currentTimeMillis()));
- }
-
- /**
- * Generate a fetchlist from the crawldb.
- */
- public static void main(String args[]) throws Exception {
- int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
- System.exit(res);
- }
-
public int run(String[] args) throws Exception {
- if (args.length < 2) {
- System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
+ if (args.length < 1) {
+ System.out.println("Usage: GeneratorHbase <webtable> [-topN N] [-noFilter]");
return -1;
}
-
- Path dbDir = new Path(args[0]);
- Path segmentsDir = new Path(args[1]);
+
+ String table = args[0];
long curTime = System.currentTimeMillis();
long topN = Long.MAX_VALUE;
- int numFetchers = -1;
boolean filter = true;
- boolean force = false;
- for (int i = 2; i < args.length; i++) {
+ for (int i = 1; i < args.length; i++) {
if ("-topN".equals(args[i])) {
- topN = Long.parseLong(args[i+1]);
- i++;
- } else if ("-numFetchers".equals(args[i])) {
- numFetchers = Integer.parseInt(args[i+1]);
- i++;
- } else if ("-adddays".equals(args[i])) {
- long numDays = Integer.parseInt(args[i+1]);
- curTime += numDays * 1000L * 60 * 60 * 24;
+ topN = Long.parseLong(args[++i]);
} else if ("-noFilter".equals(args[i])) {
filter = false;
- } else if ("-force".equals(args[i])) {
- force = true;
}
-
}
-
+
try {
- Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force);
- if (seg == null) return -2;
- else return 0;
+ generate(table, topN, curTime, filter);
+ return 0;
} catch (Exception e) {
- LOG.fatal("Generator: " + StringUtils.stringifyException(e));
+ LOG.fatal("GeneratorHbase: " + StringUtils.stringifyException(e));
return -1;
}
}
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
+ System.exit(res);
+ }
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,78 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class GeneratorMapper
+extends TableMapper<SelectorEntry, WebTableRow> {
+
+ private URLFilters filters;
+ private URLNormalizers normalizers;
+ private boolean filter;
+ private FetchSchedule schedule;
+ private ScoringFilters scoringFilters;
+ private long curTime;
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result result,
+ Context context) throws IOException, InterruptedException {
+ String reversedUrl = Bytes.toString(key.get());
+ String url = TableUtil.unreverseUrl(reversedUrl);
+
+ WebTableRow row = new WebTableRow(result);
+
+ // If filtering is on don't generate URLs that don't pass URLFilters
+ try {
+ url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ if (filter && filters.filter(url) == null)
+ return;
+ } catch (URLFilterException e) {
+ Generator.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
+ return;
+ }
+
+ // check fetch schedule
+ if (!schedule.shouldFetch(url, row, curTime)) {
+ if (Generator.LOG.isDebugEnabled()) {
+ Generator.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" +
+ row.getFetchTime() + ", curTime=" + curTime);
+ }
+ return;
+ }
+
+ float score = row.getScore();
+ try {
+ score = scoringFilters.generatorSortValue(url, row, score);
+ } catch (ScoringFilterException e) {
+ // ignore
+ }
+ SelectorEntry entry = new SelectorEntry(url, score);
+ context.write(entry, row);
+
+ }
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ filters = new URLFilters(conf);
+ curTime = conf.getLong(Generator.CRAWL_GEN_CUR_TIME,
+ System.currentTimeMillis());
+ normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ filter = conf.getBoolean(Generator.CRAWL_GENERATE_FILTER, true);
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ scoringFilters = new ScoringFilters(conf);
+ }
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,68 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+/** Reduce class for generate
+ *
+ * The #reduce() method write a random integer to all generated URLs. This random
+ * number is then used by {@link FetcherMapper}.
+ *
+ */
+public class GeneratorReducer
+extends TableReducer<SelectorEntry, WebTableRow, SelectorEntry> {
+
+ private long limit;
+ private long maxPerHost;
+ private long count = 0;
+ private Map<String, Integer> hostCountMap = new HashMap<String, Integer>();
+ private Random random = new Random();
+
+ @Override
+ protected void reduce(SelectorEntry key, Iterable<WebTableRow> values,
+ Context context) throws IOException, InterruptedException {
+ for (WebTableRow row : values) {
+ if (maxPerHost > 0) {
+ String host = key.host;
+ Integer hostCount = hostCountMap.get(host);
+ if (hostCount == null) {
+ hostCountMap.put(host, 0);
+ hostCount = 0;
+ }
+ if (hostCount > maxPerHost) {
+ return;
+ }
+ hostCountMap.put(host, hostCount + 1);
+ }
+ if (count >= limit) {
+ return;
+ }
+
+ row.putMeta(Generator.GENERATOR_MARK, Bytes.toBytes(random.nextInt()));
+ row.makeRowMutation().writeToContext(key, context);
+ count++;
+ }
+ }
+
+ @Override
+ protected void setup(Context context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ long totalLimit = conf.getLong(Generator.CRAWL_TOP_N, Long.MAX_VALUE);
+ if (totalLimit == Long.MAX_VALUE) {
+ limit = Long.MAX_VALUE;
+ } else {
+ limit = totalLimit / context.getNumReduceTasks();
+ }
+ maxPerHost = conf.getLong(Generator.GENERATE_MAX_PER_HOST, -1);
+ }
+
+}
\ No newline at end of file
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java Sun Aug 16 22:25:12 2009
@@ -1,198 +1,223 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
package org.apache.nutch.crawl;
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
-// Commons Logging imports
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-
-import org.apache.nutch.net.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LogUtil;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
-/** This class takes a flat file of URLs and adds them to the of pages to be
- * crawled. Useful for bootstrapping the system. */
-public class Injector extends Configured implements Tool {
- public static final Log LOG = LogFactory.getLog(Injector.class);
+public class Injector
+implements Tool {
-
- /** Normalize and filter injected urls. */
- public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
+ public static final Log LOG = LogFactory.getLog(Injector.class);
+
+ private static final String INJECT_KEY_STR = "__injkey__";
+ public static final byte[] INJECT_KEY =
+ Bytes.toBytes(INJECT_KEY_STR);
+
+ private static final Set<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, INJECT_KEY));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
+ }
+
+ private Configuration conf;
+
+ public static class UrlMapper
+ extends Mapper<LongWritable, Text, Text, Text> {
private URLNormalizers urlNormalizers;
- private int interval;
- private float scoreInjected;
- private JobConf jobConf;
private URLFilters filters;
- private ScoringFilters scfilters;
- private long curTime;
+ private HTable table;
+ private HBaseConfiguration hbaseConf;
- public void configure(JobConf job) {
- this.jobConf = job;
- urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
- interval = jobConf.getInt("db.fetch.interval.default", 2592000);
- filters = new URLFilters(jobConf);
- scfilters = new ScoringFilters(jobConf);
- scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
- curTime = job.getLong("injector.current.time", System.currentTimeMillis());
- }
-
- public void close() {}
-
- public void map(WritableComparable key, Text value,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
- String url = value.toString(); // value is line of text
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException {
+ if (table == null) {
+ throw new IOException("Can not connect to hbase table");
+ }
+ String url = value.toString();
+ String reversedUrl;
try {
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
- url = filters.filter(url); // filter the url
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
- url = null;
- }
- if (url != null) { // if it passes
- value.set(url); // collect it
- CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
- datum.setFetchTime(curTime);
- datum.setScore(scoreInjected);
- try {
- scfilters.injectedScore(value, datum);
- } catch (ScoringFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Cannot filter injected score for url " + url +
- ", using default (" + e.getMessage() + ")");
- }
- datum.setScore(scoreInjected);
+ url = filters.filter(url);
+ if (url == null) {
+ return;
}
- output.collect(value, datum);
+ reversedUrl = TableUtil.reverseUrl(url);
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ return;
}
- }
- }
- /** Combine multiple new entries for a url. */
- public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
- public void configure(JobConf job) {}
- public void close() {}
+ Put put = new Put(Bytes.toBytes(reversedUrl));
+ put.add(WebTableColumns.METADATA, INJECT_KEY, TableUtil.YES_VAL);
- private CrawlDatum old = new CrawlDatum();
- private CrawlDatum injected = new CrawlDatum();
-
- public void reduce(Text key, Iterator<CrawlDatum> values,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
- boolean oldSet = false;
- while (values.hasNext()) {
- CrawlDatum val = values.next();
- if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
- injected.set(val);
- injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
- } else {
- old.set(val);
- oldSet = true;
- }
+ table.put(put);
+ }
+
+ @Override
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_INJECT);
+ filters = new URLFilters(conf);
+ hbaseConf = new HBaseConfiguration();
+ try {
+ table = new HTable(hbaseConf, conf.get("input.table") );
+ } catch (IOException e) {
+ e.printStackTrace(LogUtil.getFatalStream(LOG));
}
- CrawlDatum res = null;
- if (oldSet) res = old; // don't overwrite existing value
- else res = injected;
- output.collect(key, res);
}
- }
+
+ @Override
+ public void cleanup(Context context) throws IOException {
+ table.close();
+ }
- public Injector() {}
-
- public Injector(Configuration conf) {
- setConf(conf);
}
- public void inject(Path crawlDb, Path urlDir) throws IOException {
-
- if (LOG.isInfoEnabled()) {
- LOG.info("Injector: starting");
- LOG.info("Injector: crawlDb: " + crawlDb);
- LOG.info("Injector: urlDir: " + urlDir);
+ public static class InjectorMapper
+ extends TableMapper<Text, Text> {
+ private HTable table;
+ private float scoreInjected;
+ private FetchSchedule schedule;
+ private ScoringFilters scoringFilters;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ table = new HTable(conf.get(TableInputFormat.INPUT_TABLE));
+ scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+ scoringFilters = new ScoringFilters(conf);
}
-
- Path tempDir =
- new Path(getConf().get("mapred.temp.dir", ".") +
- "/inject-temp-"+
- Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
- // map text input file to a <url,CrawlDatum> file
- if (LOG.isInfoEnabled()) {
- LOG.info("Injector: Converting injected urls to crawl db entries.");
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result result, Context context)
+ throws IOException {
+ WebTableRow row = new WebTableRow(result);
+ row.deleteMeta(INJECT_KEY);
+ if (!row.hasColumn(WebTableColumns.STATUS, null)) {
+ String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+ // this is a new column so add necessary fields
+ row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+ schedule.initializeSchedule(url, row);
+ try {
+ scoringFilters.injectedScore(url, row);
+ } catch (ScoringFilterException e) {
+ row.setScore(scoreInjected);
+ }
+ }
+ row.makeRowMutation().commit(table);
}
- JobConf sortJob = new NutchJob(getConf());
- sortJob.setJobName("inject " + urlDir);
- FileInputFormat.addInputPath(sortJob, urlDir);
- sortJob.setMapperClass(InjectMapper.class);
-
- FileOutputFormat.setOutputPath(sortJob, tempDir);
- sortJob.setOutputFormat(SequenceFileOutputFormat.class);
- sortJob.setOutputKeyClass(Text.class);
- sortJob.setOutputValueClass(CrawlDatum.class);
- sortJob.setLong("injector.current.time", System.currentTimeMillis());
- JobClient.runJob(sortJob);
-
- // merge with existing crawl db
- if (LOG.isInfoEnabled()) {
- LOG.info("Injector: Merging injected urls into crawl db.");
+
+ @Override
+ public void cleanup(Context context) throws IOException {
+ table.close();
}
- JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
- FileInputFormat.addInputPath(mergeJob, tempDir);
- mergeJob.setReducerClass(InjectReducer.class);
- JobClient.runJob(mergeJob);
- CrawlDb.install(mergeJob, crawlDb);
+ }
- // clean up
- FileSystem fs = FileSystem.get(getConf());
- fs.delete(tempDir, true);
- if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); }
+ public boolean inject(String table, Path urlDir)
+ throws IOException, InterruptedException, ClassNotFoundException {
+
+ LOG.info("Injector: starting");
+ LOG.info("Injector: urlDir: " + urlDir);
+
+ getConf().setLong("injector.current.time", System.currentTimeMillis());
+ getConf().set("input.table", table);
+ Job job = new NutchJob(getConf(), "inject-hbase-p1 " + urlDir);
+ FileInputFormat.addInputPath(job, urlDir);
+ job.setMapperClass(UrlMapper.class);
+ TableMapReduceUtil.initTableReducerJob(table,
+ IdentityTableReducer.class, job);
+ job.setOutputFormatClass(NullOutputFormat.class);
+
+ if (!job.waitForCompletion(true)) {
+ LOG.warn("Injecting new users failed!");
+ return false;
+ }
+ job = new NutchJob(getConf(), "inject-hbase-p2 " + urlDir);
+
+ Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+ scan.setFilter(new ValueFilter(WebTableColumns.METADATA, INJECT_KEY, ValueFilter.CompareOp.EQUAL, TableUtil.YES_VAL, true));
+ TableMapReduceUtil.initTableMapperJob(table,
+ scan, InjectorMapper.class, Text.class, Text.class, job);
+ TableMapReduceUtil.initTableReducerJob(table,
+ IdentityTableReducer.class, job);
+ if (job.waitForCompletion(true)) {
+ LOG.info("Injector: done");
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
}
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
- System.exit(res);
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
}
-
+
public int run(String[] args) throws Exception {
if (args.length < 2) {
- System.err.println("Usage: Injector <crawldb> <url_dir>");
+ System.err.println("Usage: Injector <webtable> <url_dir>");
return -1;
}
try {
- inject(new Path(args[0]), new Path(args[1]));
- return 0;
+ if (inject(args[0], new Path(args[1]))) {
+ return 0;
+ }
+ return -1;
} catch (Exception e) {
LOG.fatal("Injector: " + StringUtils.stringifyException(e));
return -1;
}
}
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(),
+ new Injector(), args);
+ System.exit(res);
+ }
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java Sun Aug 16 22:25:12 2009
@@ -17,9 +17,14 @@
package org.apache.nutch.crawl;
+import java.util.Collection;
+import java.util.HashSet;
+
import org.apache.hadoop.io.MD5Hash;
import org.apache.nutch.parse.Parse;
-import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.WebTableRow;
/**
* Default implementation of a page signature. It calculates an MD5 hash
@@ -30,10 +35,20 @@
*/
public class MD5Signature extends Signature {
- public byte[] calculate(Content content, Parse parse) {
- byte[] data = content.getContent();
- if (data == null) data = content.getUrl().getBytes();
- StringBuilder buf = new StringBuilder().append(data).append(parse.getText());
- return MD5Hash.digest(buf.toString().getBytes()).getDigest();
+ private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+ }
+
+ @Override
+ public byte[] calculate(WebTableRow row, Parse parse) {
+ byte[] data = row.getContent();
+ return MD5Hash.digest(data).getDigest();
+ }
+
+ @Override
+ public Collection<HbaseColumn> getColumns() {
+ return COLUMNS;
}
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java Sun Aug 16 22:25:12 2009
@@ -41,13 +41,15 @@
org.apache.nutch.parse.Outlink.class,
org.apache.nutch.parse.ParseText.class,
org.apache.nutch.parse.ParseData.class,
- org.apache.nutch.parse.ParseImpl.class,
org.apache.nutch.parse.ParseStatus.class,
org.apache.nutch.protocol.Content.class,
org.apache.nutch.protocol.ProtocolStatus.class,
org.apache.nutch.searcher.Hit.class,
org.apache.nutch.searcher.HitDetails.class,
- org.apache.nutch.searcher.Hits.class
+ org.apache.nutch.searcher.Hits.class,
+ org.apache.nutch.scoring.ScoreDatum.class,
+ org.apache.nutch.util.hbase.WebTableRow.class,
+ org.apache.hadoop.hbase.client.Result.class
};
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,16 @@
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class PartitionSelectorByHost extends Partitioner<SelectorEntry, WebTableRow> {
+
+ @Override
+ public int getPartition(SelectorEntry key, WebTableRow value,
+ int numPartitions) {
+ int hashCode = key.host.hashCode();
+
+ return (hashCode & Integer.MAX_VALUE) % numPartitions;
+ }
+}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java Sun Aug 16 22:25:12 2009
@@ -17,21 +17,16 @@
package org.apache.nutch.crawl;
+import java.util.Collection;
+
import org.apache.nutch.parse.Parse;
-import org.apache.nutch.protocol.Content;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.hadoop.conf.Configured;
-public abstract class Signature implements Configurable {
- protected Configuration conf;
+public abstract class Signature extends Configured {
- public abstract byte[] calculate(Content content, Parse parse);
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ public abstract byte[] calculate(WebTableRow row, Parse parse);
+
+ public abstract Collection<HbaseColumn> getColumns();
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java Sun Aug 16 22:25:12 2009
@@ -17,23 +17,13 @@
package org.apache.nutch.crawl;
-import java.util.Comparator;
-
-public class SignatureComparator implements Comparator {
- public int compare(Object o1, Object o2) {
- return _compare(o1, o2);
- }
+public class SignatureComparator {
+ public static int compare(byte[] data1, byte[] data2) {
+ if (data1 == null && data2 == null) return 0;
+ if (data1 == null) return -1;
+ if (data2 == null) return 1;
+ return _compare(data1, 0, data1.length, data2, 0, data2.length); }
- public static int _compare(Object o1, Object o2) {
- if (o1 == null && o2 == null) return 0;
- if (o1 == null) return -1;
- if (o2 == null) return 1;
- if (!(o1 instanceof byte[])) return -1;
- if (!(o2 instanceof byte[])) return 1;
- byte[] data1 = (byte[])o1;
- byte[] data2 = (byte[])o2;
- return _compare(data1, 0, data1.length, data2, 0, data2.length);
- }
public static int _compare(byte[] data1, int s1, int l1, byte[] data2, int s2, int l2) {
if (l2 > l1) return -1;
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java Sun Aug 16 22:25:12 2009
@@ -18,12 +18,15 @@
package org.apache.nutch.crawl;
// Commons Logging imports
+import java.util.Collection;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
// Hadoop imports
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.util.ObjectCache;
+import org.apache.nutch.util.hbase.HbaseColumn;
/**
* Factory class, which instantiates a Signature implementation according to the
@@ -44,10 +47,8 @@
Signature impl = (Signature)objectCache.getObject(clazz);
if (impl == null) {
try {
- if (LOG.isInfoEnabled()) {
- LOG.info("Using Signature impl: " + clazz);
- }
- Class implClass = Class.forName(clazz);
+ LOG.info("Using Signature impl: " + clazz);
+ Class<?> implClass = Class.forName(clazz);
impl = (Signature)implClass.newInstance();
impl.setConf(conf);
objectCache.setObject(clazz, impl);
@@ -57,4 +58,9 @@
}
return impl;
}
+
+ public static Collection<HbaseColumn> getColumns(Configuration conf) {
+ Signature impl = getSignature(conf);
+ return impl.getColumns();
+ }
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,67 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.scoring.ScoreDatum;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class TableUpdateMapper
+extends TableMapper<ImmutableBytesWritable, NutchWritable> {
+ public static final Log LOG = TableUpdater.LOG;
+
+ private ScoringFilters scoringFilters;
+
+ private List<ScoreDatum> scoreData = new ArrayList<ScoreDatum>();
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result result, Context context)
+ throws IOException, InterruptedException {
+
+ String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+ WebTableRow row = new WebTableRow(result);
+
+ Collection<Outlink> outlinks = row.getOutlinks();
+
+ scoreData.clear();
+ for (Outlink outlink : outlinks) {
+ scoreData.add(new ScoreDatum(0.0f, outlink.getToUrl(), outlink.getAnchor()));
+ }
+
+ // TODO: Outlink filtering (i.e. "only keep the first n outlinks")
+ try {
+ scoringFilters.distributeScoreToOutlinks(url, row, scoreData, outlinks.size());
+ } catch (ScoringFilterException e) {
+ LOG.warn("Distributing score failed for URL: " + key +
+ " exception:" + StringUtils.stringifyException(e));
+ }
+
+ context.write(key, new NutchWritable(row));
+
+ for (ScoreDatum scoreDatum : scoreData) {
+ String reversedOut = TableUtil.reverseUrl(scoreDatum.getUrl());
+ ImmutableBytesWritable outKey =
+ new ImmutableBytesWritable(reversedOut.getBytes());
+ scoreDatum.setUrl(url);
+ context.write(outKey, new NutchWritable(scoreDatum));
+ }
+ }
+
+ @Override
+ public void setup(Context context) {
+ scoringFilters = new ScoringFilters(context.getConfiguration());
+ }
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,163 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.FetchSchedule;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureComparator;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.TableParser;
+import org.apache.nutch.scoring.ScoreDatum;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class TableUpdateReducer
+extends TableReducer<ImmutableBytesWritable, NutchWritable, ImmutableBytesWritable> {
+
+ public static final Log LOG = TableUpdater.LOG;
+
+ private int retryMax;
+ private boolean additionsAllowed;
+ private int maxInterval;
+ private FetchSchedule schedule;
+ private ScoringFilters scoringFilters;
+ private List<ScoreDatum> inlinkedScoreData = new ArrayList<ScoreDatum>();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ retryMax = conf.getInt("db.fetch.retry.max", 3);
+ additionsAllowed = conf.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
+ maxInterval = conf.getInt("db.fetch.interval.max", 0 );
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ scoringFilters = new ScoringFilters(conf);
+ }
+
+ @Override
+ protected void reduce(ImmutableBytesWritable key, Iterable<NutchWritable> values,
+ Context context) throws IOException, InterruptedException {
+
+ WebTableRow row = null;
+ inlinkedScoreData.clear();
+
+ for (NutchWritable nutchWritable : values) {
+ Writable val = nutchWritable.get();
+ if (val instanceof WebTableRow) {
+ row = (WebTableRow) val;
+ } else {
+ inlinkedScoreData.add((ScoreDatum) val);
+ }
+ }
+ String url;
+ try {
+ url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+ } catch (Exception e) {
+ // this can happen because a newly discovered malformed link
+ // may slip by url filters
+ // TODO: Find a better solution
+ return;
+ }
+
+ if (row == null) { // new row
+ if (!additionsAllowed) {
+ return;
+ }
+ row = new WebTableRow(key.get());
+ schedule.initializeSchedule(url, row);
+ row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+ try {
+ scoringFilters.initialScore(url, row);
+ } catch (ScoringFilterException e) {
+ row.setScore(0.0f);
+ }
+ } else {
+ if (row.hasMeta(Fetcher.REDIRECT_DISCOVERED) && !row.hasColumn(WebTableColumns.STATUS, null)) {
+ // this row is marked during fetch as the destination of a redirect
+ // but does not contain anything else, so we initialize it.
+ schedule.initializeSchedule(url, row);
+ row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+ try {
+ scoringFilters.initialScore(url, row);
+ } catch (ScoringFilterException e) {
+ row.setScore(0.0f);
+ }
+ } else { // update row
+ byte status = row.getStatus();
+ switch (status) {
+ case CrawlDatumHbase.STATUS_FETCHED: // succesful fetch
+ case CrawlDatumHbase.STATUS_REDIR_TEMP: // successful fetch, redirected
+ case CrawlDatumHbase.STATUS_REDIR_PERM:
+ case CrawlDatumHbase.STATUS_NOTMODIFIED: // successful fetch, notmodified
+ int modified = FetchSchedule.STATUS_UNKNOWN;
+ if (status == CrawlDatumHbase.STATUS_NOTMODIFIED) {
+ modified = FetchSchedule.STATUS_NOTMODIFIED;
+ }
+ byte[] prevSig = row.getPrevSignature();
+ byte[] signature = row.getSignature();
+ if (prevSig != null && signature != null) {
+ if (SignatureComparator.compare(prevSig, signature) != 0) {
+ modified = FetchSchedule.STATUS_MODIFIED;
+ } else {
+ modified = FetchSchedule.STATUS_NOTMODIFIED;
+ }
+ }
+ long fetchTime = row.getFetchTime();
+ long prevFetchTime = row.getPrevFetchTime();
+ long modifiedTime = row.getModifiedTime();
+
+ schedule.setFetchSchedule(url, row, prevFetchTime, 0L,
+ fetchTime, modifiedTime, modified);
+ if (maxInterval < row.getFetchInterval())
+ schedule.forceRefetch(url, row, false);
+ break;
+ case CrawlDatumHbase.STATUS_RETRY:
+ schedule.setPageRetrySchedule(url, row, 0L, 0L, row.getFetchTime());
+ if (row.getRetriesSinceFetch() < retryMax) {
+ row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+ } else {
+ row.setStatus(CrawlDatumHbase.STATUS_GONE);
+ }
+ break;
+ case CrawlDatumHbase.STATUS_GONE:
+ schedule.setPageGoneSchedule(url, row, 0L, 0L, row.getFetchTime());
+ break;
+ }
+ }
+ }
+
+ row.deleteAllInlinks();
+ for (ScoreDatum inlink : inlinkedScoreData) {
+ row.addInlink(new Inlink(inlink.getUrl(), inlink.getAnchor()));
+ }
+
+ try {
+ scoringFilters.updateScore(url, row, inlinkedScoreData);
+ } catch (ScoringFilterException e) {
+ LOG.warn("Scoring filters failed with exception " +
+ StringUtils.stringifyException(e));
+ }
+
+ // clear markers
+ row.deleteMeta(Fetcher.REDIRECT_DISCOVERED);
+ row.deleteMeta(Generator.GENERATOR_MARK);
+ row.deleteMeta(Fetcher.FETCH_MARK);
+ row.deleteMeta(TableParser.PARSE_MARK);
+
+ row.makeRowMutation(key.get()).writeToContext(key, context);
+ }
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,81 @@
+package org.apache.nutch.crawl;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.TableParser;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class TableUpdater extends Configured
+implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(TableUpdater.class);
+
+
+ private static final Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.OUTLINKS));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.INLINKS));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, TableParser.PARSE_MARK));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, Fetcher.REDIRECT_DISCOVERED));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.RETRIES));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_TIME));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.MODIFIED_TIME));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_INTERVAL));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.PREV_FETCH_TIME));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.PREV_SIGNATURE));
+ }
+
+ private void updateTable(String table) throws Exception {
+ LOG.info("TableUpdater: starting");
+ LOG.info("TableUpdater: table: " + table);
+ Job job = new NutchJob(getConf(), "update-table " + table);
+ //job.setBoolean(ALL, updateAll);
+ job.setJobName("update-table " + table);
+ ScoringFilters scoringFilters = new ScoringFilters(getConf());
+ HashSet<HbaseColumn> columns = new HashSet<HbaseColumn>(COLUMNS);
+ columns.addAll(scoringFilters.getColumns());
+ TableMapReduceUtil.initTableMapperJob(table, TableUtil.createScanFromColumns(columns),
+ TableUpdateMapper.class, ImmutableBytesWritable.class,
+ NutchWritable.class, job);
+ TableMapReduceUtil.initTableReducerJob(table, TableUpdateReducer.class, job);
+
+ job.waitForCompletion(true);
+ LOG.info("TableUpdater: done");
+ }
+
+ public int run(String[] args) throws Exception {
+ String usage = "Usage: TableUpdater <webtable>";
+
+ if (args.length < 1) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ updateTable(args[0]);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new TableUpdater(), args);
+ System.exit(res);
+ }
+
+}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java Sun Aug 16 22:25:12 2009
@@ -17,22 +17,20 @@
package org.apache.nutch.crawl;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.io.MD5Hash;
+
import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseImpl;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.util.StringUtil;
-import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.WebTableRow;
/**
* <p>An implementation of a page signature. It calculates an MD5 hash
@@ -60,16 +58,22 @@
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class TextProfileSignature extends Signature {
+
+ private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+ }
Signature fallback = new MD5Signature();
- public byte[] calculate(Content content, Parse parse) {
+ public byte[] calculate(WebTableRow row, Parse parse) {
int MIN_TOKEN_LEN = getConf().getInt("db.signature.text_profile.min_token_len", 2);
float QUANT_RATE = getConf().getFloat("db.signature.text_profile.quant_rate", 0.01f);
HashMap<String, Token> tokens = new HashMap<String, Token>();
String text = null;
if (parse != null) text = parse.getText();
- if (text == null || text.length() == 0) return fallback.calculate(content, parse);
+ if (text == null || text.length() == 0) return fallback.calculate(row, parse);
StringBuffer curToken = new StringBuffer();
int maxFreq = 0;
for (int i = 0; i < text.length(); i++) {
@@ -133,6 +137,10 @@
}
return MD5Hash.digest(newText.toString()).getDigest();
}
+
+ public Collection<HbaseColumn> getColumns() {
+ return COLUMNS;
+ }
private static class Token {
public int cnt;
@@ -153,30 +161,4 @@
return t2.cnt - t1.cnt;
}
}
-
- public static void main(String[] args) throws Exception {
- TextProfileSignature sig = new TextProfileSignature();
- sig.setConf(NutchConfiguration.create());
- HashMap<String, byte[]> res = new HashMap<String, byte[]>();
- File[] files = new File(args[0]).listFiles();
- for (int i = 0; i < files.length; i++) {
- FileInputStream fis = new FileInputStream(files[i]);
- BufferedReader br = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
- StringBuffer text = new StringBuffer();
- String line = null;
- while ((line = br.readLine()) != null) {
- if (text.length() > 0) text.append("\n");
- text.append(line);
- }
- br.close();
- byte[] signature = sig.calculate(null, new ParseImpl(text.toString(), null));
- res.put(files[i].toString(), signature);
- }
- Iterator<String> it = res.keySet().iterator();
- while (it.hasNext()) {
- String name = it.next();
- byte[] signature = res.get(name);
- System.out.println(name + "\t" + StringUtil.toHexString(signature));
- }
- }
}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,49 @@
+package org.apache.nutch.fetcher;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+
+public class FetchEntry implements Writable {
+
+ private ImmutableBytesWritable key;
+ private Result row;
+
+ public FetchEntry() {
+ key = new ImmutableBytesWritable();
+ row = new Result();
+ }
+
+ public FetchEntry(FetchEntry fe) {
+ this.key = new ImmutableBytesWritable(fe.key.get().clone());
+ }
+
+ public FetchEntry(ImmutableBytesWritable key, Result row) {
+ this.key = key;
+ this.row = row;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ key.readFields(in);
+ row.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ key.write(out);
+ row.write(out);
+ }
+
+ public ImmutableBytesWritable getKey() {
+ return key;
+ }
+
+ public Result getRow() {
+ return row;
+ }
+}