You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/08/17 00:25:17 UTC
svn commit: r804789 [5/6] - in /lucene/nutch/branches/nutchbase: ./ bin/
conf/ lib/ src/java/org/apache/nutch/analysis/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/ind...
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java Sun Aug 16 22:25:12 2009
@@ -18,17 +18,11 @@
import java.util.Collection;
import java.util.List;
-import java.util.Map.Entry;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.Text;
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.indexer.NutchDocument;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseData;
-import org.apache.nutch.plugin.Pluggable;
-import org.apache.nutch.protocol.Content;
+import org.apache.nutch.plugin.TablePluggable;
+import org.apache.nutch.util.hbase.WebTableRow;
/**
* A contract defining behavior of scoring plugins.
@@ -39,7 +33,7 @@
*
* @author Andrzej Bialecki
*/
-public interface ScoringFilter extends Configurable, Pluggable {
+public interface ScoringFilter extends Configurable, TablePluggable {
/** The name of the extension point. */
public final static String X_POINT_ID = ScoringFilter.class.getName();
@@ -49,10 +43,10 @@
* score to a non-zero value, to give newly injected pages some initial
* credit.
* @param url url of the page
- * @param datum new datum. Filters will modify it in-place.
+ * @param row new row. Filters will modify it in-place.
* @throws ScoringFilterException
*/
- public void injectedScore(Text url, CrawlDatum datum) throws ScoringFilterException;
+ public void injectedScore(String url, WebTableRow row) throws ScoringFilterException;
/**
* Set an initial score for newly discovered pages. Note: newly discovered pages
@@ -60,102 +54,59 @@
* may choose to set initial score to zero (unknown value), and then the inlink
* score contribution will set the "real" value of the new page.
* @param url url of the page
- * @param datum new datum. Filters will modify it in-place.
+ * @param row page row. Modifications will be persisted.
* @throws ScoringFilterException
*/
- public void initialScore(Text url, CrawlDatum datum) throws ScoringFilterException;
+ public void initialScore(String url, WebTableRow row) throws ScoringFilterException;
/**
* This method prepares a sort value for the purpose of sorting and
* selecting top N scoring pages during fetchlist generation.
* @param url url of the page
- * @param datum page's datum, should not be modified
+ * @param datum page row. Modifications will be persisted.
* @param initSort initial sort value, or a value from previous filters in chain
*/
- public float generatorSortValue(Text url, CrawlDatum datum, float initSort) throws ScoringFilterException;
-
- /**
- * This method takes all relevant score information from the current datum
- * (coming from a generated fetchlist) and stores it into
- * {@link org.apache.nutch.protocol.Content} metadata.
- * This is needed in order to pass this value(s) to the mechanism that distributes it
- * to outlinked pages.
- * @param url url of the page
- * @param datum source datum. NOTE: modifications to this value are not persisted.
- * @param content instance of content. Implementations may modify this
- * in-place, primarily by setting some metadata properties.
- */
- public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) throws ScoringFilterException;
-
- /**
- * Currently a part of score distribution is performed using only data coming
- * from the parsing process. We need this method in order to ensure the
- * presence of score data in these steps.
- * @param url page url
- * @param content original content. NOTE: modifications to this value are not persisted.
- * @param parse target instance to copy the score information to. Implementations
- * may modify this in-place, primarily by setting some metadata properties.
- */
- public void passScoreAfterParsing(Text url, Content content, Parse parse) throws ScoringFilterException;
-
+ public float generatorSortValue(String url, WebTableRow row, float initSort) throws ScoringFilterException;
+
/**
* Distribute score value from the current page to all its outlinked pages.
* @param fromUrl url of the source page
- * @param parseData ParseData instance, which stores relevant score value(s)
- * in its metadata. NOTE: filters may modify this in-place, all changes will
- * be persisted.
- * @param targets <url, CrawlDatum> pairs. NOTE: filters can modify this in-place,
- * all changes will be persisted.
- * @param adjust a CrawlDatum instance, initially null, which implementations
- * may use to pass adjustment values to the original CrawlDatum. When creating
- * this instance, set its status to {@link CrawlDatum#STATUS_LINKED}.
+ * @param row page row
+ * @param scoreData A list of {@link OutlinkedScoreDatum}s for every outlink.
+ * These {@link OutlinkedScoreDatum}s will be passed to
+ * {@link #updateScore(String, WebTableRow, List)}
+ * for every outlinked URL.
* @param allCount number of all collected outlinks from the source page
- * @return if needed, implementations may return an instance of CrawlDatum,
- * with status {@link CrawlDatum#STATUS_LINKED}, which contains adjustments
- * to be applied to the original CrawlDatum score(s) and metadata. This can
- * be null if not needed.
* @throws ScoringFilterException
*/
- public CrawlDatum distributeScoreToOutlinks(Text fromUrl, ParseData parseData,
- Collection<Entry<Text, CrawlDatum>> targets, CrawlDatum adjust,
- int allCount) throws ScoringFilterException;
+ public void distributeScoreToOutlinks(String fromUrl,
+ WebTableRow row, Collection<ScoreDatum> scoreData,
+ int allCount) throws ScoringFilterException;
/**
- * This method calculates a new score of CrawlDatum during CrawlDb update, based on the
- * initial value of the original CrawlDatum, and also score values contributed by
- * inlinked pages.
+ * This method calculates a new score during table update, based on the values contributed
+ * by inlinked pages.
* @param url url of the page
- * @param old original datum, with original score. May be null if this is a newly
- * discovered page. If not null, filters should use score values from this parameter
- * as the starting values - the <code>datum</code> parameter may contain values that are
- * no longer valid, if other updates occured between generation and this update.
- * @param datum the new datum, with the original score saved at the time when
- * fetchlist was generated. Filters should update this in-place, and it will be saved in
- * the crawldb.
- * @param inlinked (partial) list of CrawlDatum-s (with their scores) from
- * links pointing to this page, found in the current update batch.
+ * @param row page row
+ * @param inlinked list of {@link OutlinkedScoreDatum}s for all inlinks pointing to this URL.
* @throws ScoringFilterException
*/
- public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, List<CrawlDatum> inlinked) throws ScoringFilterException;
+ public void updateScore(String url, WebTableRow row, List<ScoreDatum> inlinkedScoreData)
+ throws ScoringFilterException;
/**
* This method calculates a Lucene document boost.
* @param url url of the page
- * @param doc Lucene document. NOTE: this already contains all information collected
+ * @param doc document. NOTE: this already contains all information collected
* by indexing filters. Implementations may modify this instance, in order to store/remove
* some information.
- * @param dbDatum current page from CrawlDb. NOTE: changes made to this instance
- * are not persisted.
- * @param fetchDatum datum from FetcherOutput (containing among others the fetching status)
- * @param parse parsing result. NOTE: changes made to this instance are not persisted.
- * @param inlinks current inlinks from LinkDb. NOTE: changes made to this instance are
- * not persisted.
+ * @param row page row
* @param initScore initial boost value for the Lucene document.
* @return boost value for the Lucene document. This value is passed as an argument
* to the next scoring filter in chain. NOTE: implementations may also express
* other scoring strategies by modifying Lucene document directly.
* @throws ScoringFilterException
*/
- public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
- CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException;
+ public float indexerScore(String url, NutchDocument doc, WebTableRow row, float initScore)
+ throws ScoringFilterException;
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java Sun Aug 16 22:25:12 2009
@@ -19,24 +19,20 @@
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map.Entry;
+import java.util.Set;
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.Inlinks;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.nutch.indexer.NutchDocument;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseData;
import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
-import org.apache.nutch.plugin.PluginRuntimeException;
import org.apache.nutch.plugin.PluginRepository;
-import org.apache.nutch.protocol.Content;
+import org.apache.nutch.plugin.PluginRuntimeException;
import org.apache.nutch.util.ObjectCache;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.io.Text;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
/**
* Creates and caches {@link ScoringFilter} implementing plugins.
@@ -89,58 +85,63 @@
}
/** Calculate a sort value for Generate. */
- public float generatorSortValue(Text url, CrawlDatum datum, float initSort) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- initSort = this.filters[i].generatorSortValue(url, datum, initSort);
+ @Override
+ public float generatorSortValue(String url, WebTableRow row, float initSort)
+ throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ initSort = filter.generatorSortValue(url, row, initSort);
}
return initSort;
}
/** Calculate a new initial score, used when adding newly discovered pages. */
- public void initialScore(Text url, CrawlDatum datum) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- this.filters[i].initialScore(url, datum);
+ @Override
+ public void initialScore(String url, WebTableRow row) throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ filter.initialScore(url, row);
}
}
/** Calculate a new initial score, used when injecting new pages. */
- public void injectedScore(Text url, CrawlDatum datum) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- this.filters[i].injectedScore(url, datum);
+ @Override
+ public void injectedScore(String url, WebTableRow row) throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ filter.injectedScore(url, row);
}
}
- /** Calculate updated page score during CrawlDb.update(). */
- public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum, List<CrawlDatum> inlinked) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- this.filters[i].updateDbScore(url, old, datum, inlinked);
+ @Override
+ public void distributeScoreToOutlinks(String fromUrl, WebTableRow row,
+ Collection<ScoreDatum> scoreData, int allCount)
+ throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ filter.distributeScoreToOutlinks(fromUrl, row, scoreData, allCount);
+ }
+ }
+
+ @Override
+ public void updateScore(String url, WebTableRow row,
+ List<ScoreDatum> inlinkedScoreData) throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ filter.updateScore(url, row, inlinkedScoreData);
+ }
+ }
+
+ @Override
+ public float indexerScore(String url, NutchDocument doc, WebTableRow row,
+ float initScore) throws ScoringFilterException {
+ for (ScoringFilter filter : filters) {
+ initScore = filter.indexerScore(url, doc, row, initScore);
}
+ return initScore;
}
- public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- this.filters[i].passScoreBeforeParsing(url, datum, content);
- }
- }
-
- public void passScoreAfterParsing(Text url, Content content, Parse parse) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- this.filters[i].passScoreAfterParsing(url, content, parse);
- }
- }
-
- public CrawlDatum distributeScoreToOutlinks(Text fromUrl, ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets, CrawlDatum adjust, int allCount) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- adjust = this.filters[i].distributeScoreToOutlinks(fromUrl, parseData, targets, adjust, allCount);
- }
- return adjust;
- }
-
- public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum, CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore) throws ScoringFilterException {
- for (int i = 0; i < this.filters.length; i++) {
- initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum, parse, inlinks, initScore);
+ @Override
+ public Collection<HbaseColumn> getColumns() {
+ Set<HbaseColumn> columns = new HashSet<HbaseColumn>();
+ for (ScoringFilter filter : filters) {
+ columns.addAll(filter.getColumns());
}
- return initScore;
+ return columns;
}
-
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/Hit.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/Hit.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/Hit.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/Hit.java Sun Aug 16 22:25:12 2009
@@ -89,6 +89,44 @@
}
}
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((dedupValue == null) ? 0 : dedupValue.hashCode());
+ result = prime * result + indexNo;
+ result = prime * result + (moreFromDupExcluded ? 1231 : 1237);
+ result = prime * result + ((sortValue == null) ? 0 : sortValue.hashCode());
+ result = prime * result + uniqueKey.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ Hit other = (Hit) obj;
+ if (!uniqueKey.equals(other.uniqueKey))
+ return false;
+ if (dedupValue == null) {
+ if (other.dedupValue != null)
+ return false;
+ } else if (!dedupValue.equals(other.dedupValue))
+ return false;
+ if (indexNo != other.indexNo)
+ return false;
+ if (moreFromDupExcluded != other.moreFromDupExcluded)
+ return false;
+ if (sortValue == null) {
+ if (other.sortValue != null)
+ return false;
+ } else if (!sortValue.equals(other.sortValue))
+ return false;
+ return true;
+ }
+
+
public void write(DataOutput out) throws IOException {
Text.writeString(out, uniqueKey);
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/NutchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/NutchBean.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/NutchBean.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/NutchBean.java Sun Aug 16 22:25:12 2009
@@ -28,11 +28,15 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.parse.*;
import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.hbase.WebTableColumns;
/**
* One stop shopping for search-related functionality.
@@ -49,8 +53,10 @@
// }
private SearchBean searchBean;
- private SegmentBean segmentBean;
- private final HitInlinks linkDb;
+
+ private HTable table;
+
+ private SummarizerFactory summarizerFactory;
/** BooleanQuery won't permit more than 32 required/prohibited clauses. We
* don't want to use too many of those. */
@@ -92,7 +98,6 @@
}
final Path luceneConfig = new Path(dir, "search-servers.txt");
final Path solrConfig = new Path(dir, "solr-servers.txt");
- final Path segmentConfig = new Path(dir, "segment-servers.txt");
if (fs.exists(luceneConfig) || fs.exists(solrConfig)) {
searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig);
@@ -101,16 +106,11 @@
final Path indexesDir = new Path(dir, "indexes");
searchBean = new LuceneSearchBean(conf, indexDir, indexesDir);
}
-
- if (fs.exists(segmentConfig)) {
- segmentBean = new DistributedSegmentBean(conf, segmentConfig);
- } else if (fs.exists(luceneConfig)) {
- segmentBean = new DistributedSegmentBean(conf, luceneConfig);
- } else {
- segmentBean = new FetchedSegments(conf, new Path(dir, "segments"));
- }
-
- linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf);
+
+ String tableName = conf.get("table.name", "webtable");
+ table = new HTable(tableName);
+
+ summarizerFactory = new SummarizerFactory(conf);
}
public static List<InetSocketAddress> readAddresses(Path path,
@@ -146,10 +146,6 @@
}
}
- public String[] getSegmentNames() throws IOException {
- return segmentBean.getSegmentNames();
- }
-
public Hits search(Query query, int numHits) throws IOException {
return search(query, numHits, null, null, false);
}
@@ -317,42 +313,56 @@
}
public Summary getSummary(HitDetails hit, Query query) throws IOException {
- return segmentBean.getSummary(hit, query);
+ return summarizerFactory.getSummarizer().getSummary(getText(hit), query);
}
public Summary[] getSummary(HitDetails[] hits, Query query)
- throws IOException {
- return segmentBean.getSummary(hits, query);
+ throws IOException {
+ Summary[] summaries = new Summary[hits.length];
+ for (int i = 0; i < hits.length; i++) {
+ summaries[i] = getSummary(hits[i], query);
+ }
+ return summaries;
+ }
+
+ private byte[] getRowId(HitDetails hitDetails) {
+ return Bytes.toBytes(hitDetails.getValue("id"));
}
- public byte[] getContent(HitDetails hit) throws IOException {
- return segmentBean.getContent(hit);
+ public byte[] getContent(HitDetails hitDetails) throws IOException {
+ Get get = new Get(getRowId(hitDetails));
+ get.addColumn(WebTableColumns.CONTENT, null);
+ return table.get(get).getValue(WebTableColumns.CONTENT, null);
}
public ParseData getParseData(HitDetails hit) throws IOException {
- return segmentBean.getParseData(hit);
+ return null; // TODO
}
public ParseText getParseText(HitDetails hit) throws IOException {
- return segmentBean.getParseText(hit);
+ return null; // TODO
+ }
+
+ public String getText(HitDetails hitDetails) throws IOException {
+ Get get = new Get(getRowId(hitDetails));
+ get.addColumn(WebTableColumns.TEXT, null);
+ return Bytes.toString(table.get(get).getValue(WebTableColumns.TEXT, null));
}
public String[] getAnchors(HitDetails hit) throws IOException {
- return linkDb.getAnchors(hit);
+ return null; // TODO
}
public Inlinks getInlinks(HitDetails hit) throws IOException {
- return linkDb.getInlinks(hit);
+ return null; // TODO
}
public long getFetchDate(HitDetails hit) throws IOException {
- return segmentBean.getFetchDate(hit);
+ return -1L; // TODO
}
public void close() throws IOException {
if (searchBean != null) { searchBean.close(); }
- if (segmentBean != null) { segmentBean.close(); }
- if (linkDb != null) { linkDb.close(); }
if (fs != null) { fs.close(); }
}
@@ -397,11 +407,6 @@
final RPCSearchBean rpcBean = (RPCSearchBean)searchBean;
return rpcBean.getProtocolVersion(className, clientVersion);
- } else if (RPCSegmentBean.class.getName().equals(className) &&
- segmentBean instanceof RPCSegmentBean) {
-
- final RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean;
- return rpcBean.getProtocolVersion(className, clientVersion);
} else {
throw new IOException("Unknown Protocol classname:" + className);
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SegmentBean.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SegmentBean.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SegmentBean.java Sun Aug 16 22:25:12 2009
@@ -16,9 +16,5 @@
*/
package org.apache.nutch.searcher;
-import java.io.IOException;
-
public interface SegmentBean extends HitContent, HitSummarizer {
-
- public String[] getSegmentNames() throws IOException;
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SolrSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SolrSearchBean.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SolrSearchBean.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/searcher/SolrSearchBean.java Sun Aug 16 22:25:12 2009
@@ -35,7 +35,6 @@
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.ToStringUtils;
-import org.apache.nutch.indexer.solr.SolrWriter;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
@@ -87,7 +86,7 @@
try {
response = solr.query(solrQuery);
} catch (final SolrServerException e) {
- throw SolrWriter.makeIOException(e);
+ throw new IOException(e);
}
final SolrDocumentList docList = response.getResults();
@@ -115,7 +114,7 @@
final String uniqueKey = (String )solrDoc.getFirstValue("id");
- hitArr[i] = new Hit(uniqueKey, sortValue, dedupValue);
+ hitArr[i] = new Hit(0, uniqueKey, sortValue, dedupValue);
}
return new Hits(docList.getNumFound(), hitArr);
@@ -126,7 +125,7 @@
try {
response = solr.query(new SolrQuery("id:\"" + hit.getUniqueKey() + "\""));
} catch (final SolrServerException e) {
- throw SolrWriter.makeIOException(e);
+ throw new IOException(e);
}
final SolrDocumentList docList = response.getResults();
@@ -146,15 +145,16 @@
buf.append("\"");
}
buf.append(")");
-
+
QueryResponse response;
try {
response = solr.query(new SolrQuery(buf.toString()));
} catch (final SolrServerException e) {
- throw SolrWriter.makeIOException(e);
+ throw new IOException(e);
}
final SolrDocumentList docList = response.getResults();
+
if (docList.size() < hits.length) {
throw new RuntimeException("Missing hit details! Found: " +
docList.size() + ", expecting: " +
@@ -184,7 +184,7 @@
try {
return solr.ping().getStatus() == 0;
} catch (final SolrServerException e) {
- throw SolrWriter.makeIOException(e);
+ throw new IOException(e);
}
}
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/segment/SegmentReader.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/segment/SegmentReader.java Sun Aug 16 22:25:12 2009
@@ -68,7 +68,7 @@
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LogUtil;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchJobConf;
/** Dump the content of a segment. */
public class SegmentReader extends Configured implements
@@ -158,7 +158,7 @@
}
private JobConf createJobConf() {
- JobConf job = new NutchJob(getConf());
+ JobConf job = new NutchJobConf(getConf());
job.setBoolean("segment.reader.co", this.co);
job.setBoolean("segment.reader.fe", this.fe);
job.setBoolean("segment.reader.ge", this.ge);
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java Sun Aug 16 22:25:12 2009
@@ -34,6 +34,7 @@
import org.apache.nutch.protocol.Content;
import org.apache.nutch.util.LogUtil;
import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.hbase.WebTableRow;
import com.ibm.icu.text.CharsetDetector;
import com.ibm.icu.text.CharsetMatch;
@@ -81,10 +82,12 @@
this.confidence = confidence;
}
+ @SuppressWarnings("unused")
public String getSource() {
return source;
}
+ @SuppressWarnings("unused")
public String getValue() {
return value;
}
@@ -163,9 +166,20 @@
}
public void autoDetectClues(Content content, boolean filter) {
- byte[] data = content.getContent();
+ autoDetectClues(content.getContent(), content.getContentType(),
+ parseCharacterEncoding(content.getMetadata().get(Response.CONTENT_TYPE)),
+ filter);
+ }
+
+ public void autoDetectClues(WebTableRow row, boolean filter) {
+ autoDetectClues(row.getContent(), row.getContentType(),
+ parseCharacterEncoding(row.getHeader(Response.CONTENT_TYPE)), filter);
+ }
+
+ private void autoDetectClues(byte[] data, String type,
+ String encoding, boolean filter) {
- if (minConfidence >= 0 && DETECTABLES.contains(content.getContentType())
+ if (minConfidence >= 0 && DETECTABLES.contains(type)
&& data.length > MIN_LENGTH) {
CharsetMatch[] matches = null;
@@ -190,8 +204,7 @@
}
// add character encoding coming from HTTP response header
- addClue(parseCharacterEncoding(
- content.getMetadata().get(Response.CONTENT_TYPE)), "header");
+ addClue(encoding, "header");
}
public void addClue(String value, String source, int confidence) {
@@ -207,7 +220,7 @@
public void addClue(String value, String source) {
addClue(value, source, NO_THRESHOLD);
}
-
+
/**
* Guess the encoding with the previously specified list of clues.
*
@@ -219,6 +232,34 @@
* @return Guessed encoding or defaultValue
*/
public String guessEncoding(Content content, String defaultValue) {
+ return guessEncoding(content.getBaseUrl(), defaultValue);
+ }
+
+ /**
+ * Guess the encoding with the previously specified list of clues.
+ *
+ * @param row URL's row
+ * @param defaultValue Default encoding to return if no encoding can be
+ * detected with enough confidence. Note that this will <b>not</b> be
+ * normalized with {@link EncodingDetector#resolveEncodingAlias}
+ *
+ * @return Guessed encoding or defaultValue
+ */
+ public String guessEncoding(WebTableRow row, String defaultValue) {
+ return guessEncoding(row.getBaseUrl(), defaultValue);
+ }
+
+ /**
+ * Guess the encoding with the previously specified list of clues.
+ *
+ * @param baseUrl Base URL
+ * @param defaultValue Default encoding to return if no encoding can be
+ * detected with enough confidence. Note that this will <b>not</b> be
+ * normalized with {@link EncodingDetector#resolveEncodingAlias}
+ *
+ * @return Guessed encoding or defaultValue
+ */
+ private String guessEncoding(String baseUrl, String defaultValue) {
/*
* This algorithm could be replaced by something more sophisticated;
* ideally we would gather a bunch of data on where various clues
@@ -227,10 +268,9 @@
* to generate a better heuristic.
*/
- String base = content.getBaseUrl();
if (LOG.isTraceEnabled()) {
- findDisagreements(base, clues);
+ findDisagreements(baseUrl, clues);
}
/*
@@ -244,12 +284,12 @@
for (EncodingClue clue : clues) {
if (LOG.isTraceEnabled()) {
- LOG.trace(base + ": charset " + clue);
+ LOG.trace(baseUrl + ": charset " + clue);
}
String charset = clue.value;
if (minConfidence >= 0 && clue.confidence >= minConfidence) {
if (LOG.isTraceEnabled()) {
- LOG.trace(base + ": Choosing encoding: " + charset +
+ LOG.trace(baseUrl + ": Choosing encoding: " + charset +
" with confidence " + clue.confidence);
}
return resolveEncodingAlias(charset).toLowerCase();
@@ -259,7 +299,7 @@
}
if (LOG.isTraceEnabled()) {
- LOG.trace(base + ": Choosing encoding: " + bestClue);
+ LOG.trace(baseUrl + ": Choosing encoding: " + bestClue);
}
return bestClue.value.toLowerCase();
}
@@ -371,10 +411,11 @@
}
byte[] data = ostr.toByteArray();
+ MimeUtil mimeTypes = new MimeUtil(conf);
// make a fake Content
Content content =
- new Content("", "", data, "text/html", new Metadata(), conf);
+ new Content("", "", data, "text/html", new Metadata(), mimeTypes);
detector.autoDetectClues(content, true);
String encoding = detector.guessEncoding(content,
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java Sun Aug 16 22:25:12 2009
@@ -17,15 +17,21 @@
package org.apache.nutch.util;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
-/** A {@link JobConf} for Nutch jobs. */
-public class NutchJob extends JobConf {
+/** A {@link Job} for Nutch jobs. */
+public class NutchJob extends Job {
- public NutchJob(Configuration conf) {
- super(conf, NutchJob.class);
+ public NutchJob(Configuration conf) throws IOException {
+ super(conf);
+ setJarByClass(this.getClass());
}
-
-}
-
+
+ public NutchJob(Configuration conf, String jobName) throws IOException {
+ super(conf, jobName);
+ setJarByClass(this.getClass());
+ }
+}
\ No newline at end of file
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJobConf.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJobConf.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJobConf.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJobConf.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+
+/** A {@link JobConf} for Nutch jobs. */
+public class NutchJobConf extends JobConf {
+
+ public NutchJobConf(Configuration conf) {
+ super(conf, NutchJobConf.class);
+ }
+
+}
+
Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/domain/DomainStatistics.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/domain/DomainStatistics.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/domain/DomainStatistics.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/domain/DomainStatistics.java Sun Aug 16 22:25:12 2009
@@ -42,7 +42,7 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchJobConf;
import org.apache.nutch.util.URLUtil;
/**
@@ -81,7 +81,7 @@
numOfReducers = Integer.parseInt(args[3]);
}
- JobConf job = new NutchJob(getConf());
+ JobConf job = new NutchJobConf(getConf());
job.setJobName("Domain statistics");
int mode = 0;
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnData.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnData.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnData.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnData.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,51 @@
+package org.apache.nutch.util.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+public class ColumnData implements Writable {
+
+ private byte[] data;
+ private boolean isModified;
+
+ public ColumnData() { // do not use!
+ }
+
+ public ColumnData(byte[] data) {
+ this(data, false);
+ }
+
+ public ColumnData(byte[] data, boolean isModified) {
+ this.data = data;
+ this.isModified = isModified;
+ }
+
+ public void set(byte[] data) {
+ this.data = data;
+ isModified = true;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public boolean isModified() {
+ return isModified;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ data = Bytes.readByteArray(in);
+ isModified = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, data);
+ out.writeBoolean(isModified);
+ }
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnDescriptor.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnDescriptor.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/ColumnDescriptor.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,21 @@
+package org.apache.nutch.util.hbase;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface ColumnDescriptor {
+ int versions() default HColumnDescriptor.DEFAULT_VERSIONS;
+ Compression.Algorithm compression() default Compression.Algorithm.NONE;
+ boolean inMemory() default HColumnDescriptor.DEFAULT_IN_MEMORY;
+ boolean blockCacheEnabled() default HColumnDescriptor.DEFAULT_BLOCKCACHE;
+ int blockSize() default HColumnDescriptor.DEFAULT_BLOCKSIZE;
+ int timeToLive() default HColumnDescriptor.DEFAULT_TTL;
+ boolean bloomFilter() default HColumnDescriptor.DEFAULT_BLOOMFILTER;
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/HbaseColumn.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/HbaseColumn.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/HbaseColumn.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/HbaseColumn.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,34 @@
+package org.apache.nutch.util.hbase;
+
+public class HbaseColumn {
+ private byte[] family;
+ private byte[] qualifier;
+ private int versions;
+
+ public HbaseColumn(byte[] family) {
+ this(family, null, 1);
+ }
+
+ public HbaseColumn(byte[] family, byte[] qualifier) {
+ this(family, qualifier, 1);
+ }
+
+ public HbaseColumn(byte[] family, byte[] qualifier, int versions) {
+ this.family = family;
+ this.qualifier = qualifier;
+ this.versions = versions;
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+ public int getVersions() {
+ return versions;
+ }
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/RowMutation.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/RowMutation.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/RowMutation.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/RowMutation.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,149 @@
+package org.apache.nutch.util.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+public class RowMutation implements Writable {
+
+ private static class RowOperation implements Writable {
+ private byte[] family, qualifier, value;
+ private long ts;
+
+ RowOperation() { }
+
+ public RowOperation(byte[] family, byte[] qualifier, byte[] value) {
+ this(family, qualifier, value, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public RowOperation(byte[] family, byte[] qualifier, byte[] value, long ts) {
+ this.family = family;
+ this.qualifier = qualifier;
+ this.value = value;
+ this.ts = ts;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ family = Bytes.readByteArray(in);
+ qualifier = Bytes.readByteArray(in);
+ value = Bytes.readByteArray(in);
+ ts = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, family);
+ Bytes.writeByteArray(out, qualifier);
+ Bytes.writeByteArray(out, value);
+ out.writeLong(ts);
+ }
+ }
+
+ private byte[] row;
+
+ private List<RowOperation> ops = new ArrayList<RowOperation>();
+
+ public RowMutation() { }
+
+ public RowMutation(byte[] row) {
+ this.row = row;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ row = Bytes.readByteArray(in);
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ RowOperation rowOp = new RowOperation();
+ rowOp.readFields(in);
+ ops.add(rowOp);
+ }
+ }
+
+ public void add(byte[] family, byte[] qualifier, byte[] value) {
+ ops.add(new RowOperation(family, qualifier, value));
+ }
+
+ public void delete(byte[] family, byte[] qualifier) {
+ ops.add(new RowOperation(family, qualifier, null));
+ }
+
+ public void add(byte[] family, byte[] qualifier, long ts, byte[] value) {
+ ops.add(new RowOperation(family, qualifier, value, ts));
+ }
+
+ public void delete(byte[] family, byte[] qualifier, long ts) {
+ ops.add(new RowOperation(family, qualifier, null, ts));
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, row);
+ for (RowOperation rowOp : ops) {
+ rowOp.write(out);
+ }
+ }
+
+ public void commit(HTable table) throws IOException {
+ Put put = new Put(row);
+ Delete delete = new Delete(row); // deletes entire row, so be careful
+ boolean hasPuts = false;
+ boolean hasDeletes = false;
+
+ for (RowOperation rowOp : ops) {
+ byte[] value = rowOp.value;
+ if (value == null) {
+ hasDeletes = true;
+ delete.deleteColumn(rowOp.family, rowOp.qualifier, rowOp.ts);
+ } else {
+ hasPuts = true;
+ put.add(rowOp.family, rowOp.qualifier, rowOp.ts, rowOp.value);
+ }
+ }
+ if (hasPuts) {
+ table.put(put);
+ }
+ if (hasDeletes) {
+ table.delete(delete);
+ }
+ }
+
+ public <K2 extends WritableComparable<? super K2>>
+ void writeToContext(K2 key, Reducer<?, ?, K2, Writable>.Context context)
+ throws IOException, InterruptedException {
+ Put put = new Put(row);
+ Delete delete = new Delete(row); // deletes entire row, be careful
+ boolean hasPuts = false;
+ boolean hasDeletes = false;
+
+ for (RowOperation rowOp : ops) {
+ byte[] value = rowOp.value;
+ if (value == null) {
+ hasDeletes = true;
+ delete.deleteColumn(rowOp.family, rowOp.qualifier);
+ } else {
+ hasPuts = true;
+ put.add(rowOp.family, rowOp.qualifier, rowOp.value);
+ }
+ }
+ if (hasPuts) {
+ context.write(key, put);
+ }
+ if (hasDeletes) {
+ context.write(key, delete);
+ }
+ }
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableRow.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableRow.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableRow.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableRow.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,240 @@
+package org.apache.nutch.util.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+class TableRow implements Writable {
+
+ private static Comparator<Long> TS_COMPARATOR = new Comparator<Long>() {
+ public int compare(Long l1, Long l2) {
+ return l2.compareTo(l1);
+ }
+ };
+
+ protected static byte[] EMPTY = new byte[0];
+
+ protected byte[] row;
+
+ protected long now;
+
+ // used for family:qualifier columns
+ protected final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, ColumnData>>> map =
+ new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, ColumnData>>>(Bytes.BYTES_COMPARATOR);
+
+ public TableRow() { // do not use!
+ now = System.currentTimeMillis();
+ }
+
+ public TableRow(byte[] row) {
+ this();
+ this.row = row;
+ }
+
+ public TableRow(Result result) {
+ this();
+ row = result.getRow();
+ for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> e1 : result.getMap().entrySet()) {
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = new TreeMap<byte[], NavigableMap<Long,ColumnData>>(Bytes.BYTES_COMPARATOR);
+ for (Entry<byte[], NavigableMap<Long, byte[]>> e2 : e1.getValue().entrySet()) {
+ NavigableMap<Long, ColumnData> qualMap = new TreeMap<Long, ColumnData>(TS_COMPARATOR);
+ for (Entry<Long, byte[]> e3 : e2.getValue().entrySet()) {
+ qualMap.put(e3.getKey(), new ColumnData(e3.getValue()));
+ }
+ familyMap.put(e2.getKey(), qualMap);
+ }
+ map.put(e1.getKey(), familyMap);
+ }
+ }
+
+ public boolean hasColumn(byte[] family, byte[] qualifier) {
+ if (qualifier == null) { qualifier = EMPTY; }
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> qualMap = map.get(family);
+ if (qualMap != null) {
+ NavigableMap<Long, ColumnData> versionMap = qualMap.get(qualifier);
+ if (versionMap != null) {
+ return versionMap.firstEntry().getValue().getData() != null;
+ }
+ }
+ return false;
+ }
+
+ public void delete(byte[] family, byte[] qualifier) {
+ delete(family, qualifier, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public void delete(byte[] family, byte[] qualifier, long ts) {
+ if (qualifier == null) { qualifier = EMPTY; }
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = map.get(family);
+ if (familyMap != null) {
+ NavigableMap<Long, ColumnData> versionMap = familyMap.get(qualifier);
+ if (versionMap != null) {
+ Entry<Long, ColumnData> e = versionMap.ceilingEntry(ts);
+ if (e != null) {
+ e.getValue().set(null);
+ } else {
+ // TODO: if family:qualifier doesn't exist, what to do?
+ // we add a delete for now
+ versionMap.put(now, new ColumnData(null, true));
+ }
+ }
+ }
+ }
+
+ public void deleteFamily(byte[] family) {
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = map.get(family);
+ if (familyMap == null) {
+ return;
+ }
+
+ for (Entry<byte[], NavigableMap<Long, ColumnData>> e: familyMap.entrySet()) {
+ ColumnData latestColumnData = e.getValue().firstEntry().getValue();
+ latestColumnData.set(null);
+ }
+
+ }
+
+ public byte[] get(byte[] family, byte[] qualifier) {
+ return get(family, qualifier, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public byte[] get(byte[] family, byte[] qualifier, long ts) {
+ if (qualifier == null) { qualifier = EMPTY; }
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = map.get(family);
+ if (familyMap != null) {
+ NavigableMap<Long, ColumnData> versionMap = familyMap.get(qualifier);
+ if (versionMap != null) {
+ Entry<Long, ColumnData> e = versionMap.ceilingEntry(ts);
+ if (e != null) {
+ return e.getValue().getData();
+ }
+ }
+ }
+ return null;
+ }
+
+ public void put(byte[] family, byte[] qualifier, byte[] value) {
+ put(family, qualifier, value, now);
+ }
+
+ public void put(byte[] family, byte[] qualifier, byte[] value, long ts) {
+ if (qualifier == null) { qualifier = EMPTY; }
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = map.get(family);
+ if (familyMap == null) {
+ familyMap = new TreeMap<byte[], NavigableMap<Long, ColumnData>>(Bytes.BYTES_COMPARATOR);
+ map.put(family, familyMap);
+ }
+ NavigableMap<Long, ColumnData> versionMap = familyMap.get(qualifier);
+ if (versionMap == null) {
+ versionMap = new TreeMap<Long, ColumnData>(TS_COMPARATOR);
+ familyMap.put(qualifier, versionMap);
+ }
+ versionMap.put(ts, new ColumnData(value, true));
+ }
+
+ public void put(byte[] family, byte[] qualifier, float value) {
+ put(family, qualifier, Bytes.toBytes(value));
+ }
+
+ public void put(byte[] family, byte[] qualifier, int value) {
+ put(family, qualifier, Bytes.toBytes(value));
+ }
+
+ public void put(byte[] family, byte[] qualifier, long value) {
+ put(family, qualifier, Bytes.toBytes(value));
+ }
+
+ public void put(byte[] family, byte[] qualifier, String value) {
+ put(family, qualifier, Bytes.toBytes(value));
+ }
+
+ public RowMutation makeRowMutation() {
+ return makeRowMutation(row);
+ }
+
+ private void addEntry(byte[] family, byte[] qualifier, long ts, byte[] value,
+ RowMutation mut) {
+ if (value == null) {
+ mut.delete(family, qualifier, ts);
+ } else {
+ mut.add(family, qualifier, ts, value);
+ }
+ }
+
+ public RowMutation makeRowMutation(byte[] row) {
+ RowMutation mut = new RowMutation(row);
+ for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, ColumnData>>> e1 : map.entrySet()) {
+ for (Entry<byte[], NavigableMap<Long, ColumnData>> e2 : e1.getValue().entrySet()) {
+ for (Entry<Long, ColumnData> e3 : e2.getValue().entrySet()) {
+ ColumnData rawDatum = e3.getValue();
+ if (rawDatum.isModified()) {
+ addEntry(e1.getKey(), e2.getKey(), e3.getKey(), rawDatum.getData(), mut);
+ }
+ }
+ }
+ }
+
+ return mut;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ now = in.readLong();
+ row = Bytes.readByteArray(in);
+
+ map.clear();
+ int mapSize = in.readInt();
+ for (int i = 0; i < mapSize; i++) {
+ byte[] family = Bytes.readByteArray(in);
+ int familyMapSize = in.readInt();
+ TreeMap<byte[], NavigableMap<Long, ColumnData>> familyMap =
+ new TreeMap<byte[], NavigableMap<Long,ColumnData>>(Bytes.BYTES_COMPARATOR);
+ for (int j = 0; j < familyMapSize; j++) {
+ byte[] qual = Bytes.readByteArray(in);
+ int versionMapSize = in.readInt();
+ TreeMap<Long, ColumnData> versionMap = new TreeMap<Long, ColumnData>(TS_COMPARATOR);
+ for (int k = 0; k < versionMapSize; k++) {
+ long ts = in.readLong();
+ ColumnData columnData = new ColumnData();
+ columnData.readFields(in);
+ versionMap.put(ts, columnData);
+ }
+ familyMap.put(qual, versionMap);
+ }
+ map.put(family, familyMap);
+ }
+
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(now);
+ Bytes.writeByteArray(out, row);
+
+ out.writeInt(map.size());
+ for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, ColumnData>>> e : map.entrySet()) {
+ Bytes.writeByteArray(out, e.getKey());
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> familyMap = e.getValue();
+ out.writeInt(familyMap.size());
+ for (Entry<byte[], NavigableMap<Long, ColumnData>> e2 : familyMap.entrySet()) {
+ Bytes.writeByteArray(out, e2.getKey());
+ NavigableMap<Long, ColumnData> versionMap = e2.getValue();
+ out.writeInt(versionMap.size());
+ for (Entry<Long, ColumnData> e3 : versionMap.entrySet()) {
+ out.writeLong(e3.getKey());
+ e3.getValue().write(out);
+ }
+ }
+ }
+ }
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableUtil.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableUtil.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableUtil.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/TableUtil.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,152 @@
+package org.apache.nutch.util.hbase;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.Scan;
+
+public class TableUtil {
+
+ public static final byte[] YES_VAL = new byte[] { 'y' };
+
+ private static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE;
+
+ /**
+ * Convert a float value to a byte array
+ * @param val
+ * @return the byte array
+ */
+ public static byte[] toBytes(final float val) {
+ ByteBuffer bb = ByteBuffer.allocate(SIZEOF_FLOAT);
+ bb.putFloat(val);
+ return bb.array();
+ }
+
+ /**
+ * Converts a byte array to a float value
+ * @param bytes
+ * @return the long value
+ */
+ public static float toFloat(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return -1L;
+ }
+ return ByteBuffer.wrap(bytes).getFloat();
+ }
+
+ /** Reverses a url's domain. This form is better for storing in hbase.
+ * Because scans within the same domain are faster. <p>
+ * E.g. "http://bar.foo.com:8983/to/index.html?a=b" becomes
+ * "com.foo.bar:8983:http/to/index.html?a=b".
+ * @param url url to be reversed
+ * @return Reversed url
+ * @throws MalformedURLException
+ */
+ public static String reverseUrl(String urlString)
+ throws MalformedURLException {
+ return reverseUrl(new URL(urlString));
+ }
+
+ /** Reverses a url's domain. This form is better for storing in hbase.
+ * Because scans within the same domain are faster. <p>
+ * E.g. "http://bar.foo.com:8983/to/index.html?a=b" becomes
+ * "com.foo.bar:http:8983/to/index.html?a=b".
+ * @param url url to be reversed
+ * @return Reversed url
+ */
+ public static String reverseUrl(URL url) {
+ String host = url.getHost();
+ String file = url.getFile();
+ String protocol = url.getProtocol();
+ int port = url.getPort();
+
+ StringBuilder buf = new StringBuilder();
+
+ /* reverse host */
+ reverseAppendSplits(host.split("\\."), buf);
+
+ /* add protocol */
+ buf.append(':');
+ buf.append(protocol);
+
+ /* add port if necessary */
+ if (port != -1) {
+ buf.append(':');
+ buf.append(port);
+ }
+
+ /* add path */
+ if (file.length() == 0 || '/' !=file.charAt(0)) {
+ buf.append('/');
+ }
+ buf.append(file);
+
+ return buf.toString();
+ }
+
+ public static String unreverseUrl(String reversedUrl) {
+ StringBuilder buf = new StringBuilder(reversedUrl.length() + 2);
+
+ int pathBegin = reversedUrl.indexOf('/');
+ String sub = reversedUrl.substring(0, pathBegin);
+
+ String[] splits = sub.split(":"); // {<reversed host>, <port>, <protocol>}
+
+ buf.append(splits[1]); // add protocol
+ buf.append("://");
+ reverseAppendSplits(splits[0].split("\\."), buf); // splits[0] is reversed host
+ if (splits.length == 3) { // has a port
+ buf.append(':');
+ buf.append(splits[2]);
+ }
+ buf.append(reversedUrl.substring(pathBegin));
+ return buf.toString();
+ }
+
+ /** Given a reversed url, returns the reversed host
+ * E.g "com.foo.bar:http:8983/to/index.html?a=b" -> "com.foo.bar"
+ * @param reversedUrl Reversed url
+ * @return Reversed host
+ */
+ public static String getReversedHost(String reversedUrl) {
+ return reversedUrl.substring(0, reversedUrl.indexOf(':'));
+ }
+
+ private static void reverseAppendSplits(String[] splits, StringBuilder buf) {
+ for (int i = splits.length - 1; i > 0; i--) {
+ buf.append(splits[i]);
+ buf.append('.');
+ }
+ buf.append(splits[0]);
+ }
+
+ /** Given a set of columns, returns a space-separated string of columns.
+ *
+ * @param columnSet Column set
+ * @return Space-separated string
+ */
+ public static String getColumns(Set<String> columnSet) {
+ StringBuilder buf = new StringBuilder();
+ for (String column : columnSet) {
+ buf.append(column);
+ buf.append(' ');
+ }
+ return buf.toString();
+ }
+
+ public static Scan createScanFromColumns(Iterable<HbaseColumn> columns) {
+ Scan scan = new Scan();
+ int maxVersions = Integer.MIN_VALUE;
+ for (HbaseColumn col : columns) {
+ if (col.getQualifier() != null) {
+ scan.addColumn(col.getFamily(), col.getQualifier());
+ }
+ scan.addColumn(col.getFamily());
+ maxVersions = Math.max(maxVersions, col.getVersions());
+ }
+ scan.setMaxVersions(maxVersions);
+ return scan;
+ }
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableColumns.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableColumns.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableColumns.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableColumns.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,75 @@
+package org.apache.nutch.util.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+public interface WebTableColumns {
+ public static final String BASE_URL_STR = "bas";
+ public static final String STATUS_STR = "stt";
+ public static final String FETCH_TIME_STR = "fcht";
+ public static final String RETRIES_STR = "rtrs";
+ public static final String FETCH_INTERVAL_STR = "fchi";
+ public static final String SCORE_STR = "scr";
+ public static final String MODIFIED_TIME_STR = "modt";
+ public static final String SIGNATURE_STR = "sig";
+ public static final String CONTENT_STR = "cnt";
+ public static final String CONTENT_TYPE_STR = "cnttyp:";
+ public static final String TITLE_STR = "ttl:";
+ public static final String OUTLINKS_STR = "olnk:";
+ public static final String INLINKS_STR = "ilnk:";
+ public static final String PARSE_STATUS_STR = "prsstt:";
+ public static final String PROTOCOL_STATUS_STR = "prtstt:";
+ public static final String TEXT_STR = "txt:";
+ public static final String REPR_URL_STR = "repr:";
+ public static final String HEADERS_STR = "hdrs:";
+ public static final String METADATA_STR = "mtdt:";
+
+ // Hackish solution to access previous versions of some columns
+ public static final String PREV_SIGNATURE_STR = "prvsig";
+ public static final String PREV_FETCH_TIME_STR = "prvfch";
+
+ @ColumnDescriptor
+ public static final byte[] BASE_URL = Bytes.toBytes(BASE_URL_STR);
+ @ColumnDescriptor
+ public static final byte[] STATUS = Bytes.toBytes(STATUS_STR);
+ @ColumnDescriptor
+ public static final byte[] FETCH_TIME = Bytes.toBytes(FETCH_TIME_STR);
+ @ColumnDescriptor
+ public static final byte[] RETRIES = Bytes.toBytes(RETRIES_STR);
+ @ColumnDescriptor
+ public static final byte[] FETCH_INTERVAL = Bytes.toBytes(FETCH_INTERVAL_STR);
+ @ColumnDescriptor
+ public static final byte[] SCORE = Bytes.toBytes(SCORE_STR);
+ @ColumnDescriptor
+ public static final byte[] MODIFIED_TIME = Bytes.toBytes(MODIFIED_TIME_STR);
+ @ColumnDescriptor
+ public static final byte[] SIGNATURE = Bytes.toBytes(SIGNATURE_STR);
+ @ColumnDescriptor
+ public static final byte[] CONTENT = Bytes.toBytes(CONTENT_STR);
+ @ColumnDescriptor
+ public static final byte[] CONTENT_TYPE = Bytes.toBytes(CONTENT_TYPE_STR);
+ @ColumnDescriptor
+ public static final byte[] TITLE = Bytes.toBytes(TITLE_STR);
+ @ColumnDescriptor
+ public static final byte[] OUTLINKS = Bytes.toBytes(OUTLINKS_STR);
+ @ColumnDescriptor
+ public static final byte[] INLINKS = Bytes.toBytes(INLINKS_STR);
+ @ColumnDescriptor
+ public static final byte[] PARSE_STATUS = Bytes.toBytes(PARSE_STATUS_STR);
+ @ColumnDescriptor
+ public static final byte[] PROTOCOL_STATUS = Bytes.toBytes(PROTOCOL_STATUS_STR);
+ @ColumnDescriptor
+ public static final byte[] TEXT = Bytes.toBytes(TEXT_STR);
+ @ColumnDescriptor
+ public static final byte[] REPR_URL = Bytes.toBytes(REPR_URL_STR);
+ @ColumnDescriptor
+ public static final byte[] HEADERS = Bytes.toBytes(HEADERS_STR);
+ @ColumnDescriptor
+ public static final byte[] METADATA = Bytes.toBytes(METADATA_STR);
+
+ // Hackish solution to access previous versions of some columns
+ @ColumnDescriptor
+ public static final byte[] PREV_SIGNATURE = Bytes.toBytes(PREV_SIGNATURE_STR);
+ @ColumnDescriptor
+ public static final byte[] PREV_FETCH_TIME = Bytes.toBytes(PREV_FETCH_TIME_STR);
+
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableCreator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableCreator.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableCreator.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableCreator.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,68 @@
+package org.apache.nutch.util.hbase;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.hadoop.conf.*;
+
+import org.apache.nutch.util.NutchConfiguration;
+
+public class WebTableCreator extends Configured implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(WebTableCreator.class);
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(),
+ new WebTableCreator(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+
+ if (args.length != 1) {
+ System.err.println("Usage: WebTableCreator <webtable>");
+ return -1;
+ }
+
+ try {
+ HBaseConfiguration hbaseConf = new HBaseConfiguration();
+
+ LOG.info("WebTableCreator: Creating table: " + args[0]);
+ HTableDescriptor tableDesc = new HTableDescriptor(args[0]);
+
+ final Field[] fields = WebTableColumns.class.getFields();
+
+ for (final Field field : fields) {
+ final Annotation ann = field.getAnnotation(ColumnDescriptor.class);
+ if (ann == null) {
+ continue;
+ }
+ ColumnDescriptor colDesc = (ColumnDescriptor) ann;
+ HColumnDescriptor family = new HColumnDescriptor((byte[])field.get(null),
+ colDesc.versions(), colDesc.compression().getName(),
+ colDesc.inMemory(), colDesc.blockCacheEnabled(),
+ colDesc.blockSize(), colDesc.timeToLive(), colDesc.bloomFilter());
+ tableDesc.addFamily(family);
+ }
+
+ HBaseAdmin admin = new HBaseAdmin(hbaseConf);
+ admin.createTable(tableDesc);
+ LOG.info("WebTableCreator: Done.");
+ return 0;
+ } catch (Exception e) {
+ LOG.fatal("WebTableCreator: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+
+ }
+}
Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableRow.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableRow.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableRow.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/util/hbase/WebTableRow.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,318 @@
+package org.apache.nutch.util.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import static org.apache.nutch.util.hbase.WebTableColumns.*;
+
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.protocol.ProtocolStatus;
+
+public class WebTableRow extends TableRow {
+ public WebTableRow() { // do not use!
+ super();
+ }
+
+ public WebTableRow(byte[] row) {
+ super(row);
+ }
+
+ public WebTableRow(Result result) {
+ super(result);
+ }
+
+ private String stringify(byte[] val) {
+ if (val == null)
+ return null;
+ return Bytes.toString(val);
+ }
+
+ public String getBaseUrl() {
+ return stringify(get(BASE_URL, null));
+ }
+
+ public byte getStatus() {
+ return get(STATUS, null)[0];
+ }
+
+ public byte[] getSignature() {
+ return get(SIGNATURE, null);
+ }
+
+ public byte[] getPrevSignature() {
+ return get(PREV_SIGNATURE, null);
+ }
+
+ public long getFetchTime() {
+ return Bytes.toLong(get(FETCH_TIME, null));
+ }
+
+ public long getPrevFetchTime() {
+ byte[] val = get(PREV_FETCH_TIME, null);
+ if (val == null)
+ return 0L;
+
+ return Bytes.toLong(val);
+ }
+
+ public long getModifiedTime() {
+ return Bytes.toLong(get(MODIFIED_TIME, null));
+ }
+
+ public int getFetchInterval() {
+ return Bytes.toInt(get(FETCH_INTERVAL, null));
+ }
+
+ public int getRetriesSinceFetch() {
+ return Bytes.toInt(get(RETRIES, null));
+ }
+
+ public ProtocolStatus getProtocolStatus() throws IOException {
+ final ProtocolStatus protocolStatus = new ProtocolStatus();
+ final byte[] val = get(PROTOCOL_STATUS, null);
+ return (ProtocolStatus) Writables.getWritable(val, protocolStatus);
+ }
+
+ public float getScore() {
+ return Bytes.toFloat(get(SCORE, null));
+ }
+
+ public byte[] getContent() {
+ return get(CONTENT, null);
+ }
+
+ public String getContentType() {
+ return stringify(get(CONTENT_TYPE, null));
+ }
+
+ public String getText() {
+ return stringify(get(TEXT, null));
+ }
+
+ public String getTitle() {
+ return stringify(get(TITLE, null));
+ }
+
+ public ParseStatus getParseStatus() {
+ final ParseStatus parseStatus = new ParseStatus();
+ final byte[] val = get(PARSE_STATUS, null);
+ try {
+ return (ParseStatus) Writables.getWritable(val, parseStatus);
+ } catch (final IOException e) {
+ return null;
+ }
+ }
+
+ public String getReprUrl() {
+ return stringify(get(REPR_URL, null));
+ }
+
+ public void setBaseUrl(String baseUrl) {
+ put(BASE_URL, null, baseUrl);
+ }
+
+ public void setContent(byte[] content) {
+ put(CONTENT, null, content);
+ }
+
+ public void setContentType(String contentType) {
+ put(CONTENT_TYPE, null, contentType);
+ }
+
+ public void setFetchInterval(int fetchInterval) {
+ put(FETCH_INTERVAL, null, fetchInterval);
+ }
+
+ public void setFetchTime(long fetchTime) {
+ put(FETCH_TIME, null, fetchTime);
+ }
+
+ public void setModifiedTime(long modifiedTime) {
+ put(MODIFIED_TIME, null, modifiedTime);
+ }
+
+ public void setParseStatus(ParseStatus parseStatus) throws IOException {
+ put(PARSE_STATUS, null, Writables.getBytes(parseStatus));
+ }
+
+ public void setPrevFetchTime(long prevFetchTime) {
+ put(PREV_FETCH_TIME, null, prevFetchTime);
+ }
+
+ public void setPrevSignature(byte[] prevSig) {
+ put(PREV_SIGNATURE, null, prevSig);
+ }
+
+ public void setProtocolStatus(ProtocolStatus protocolStatus)
+ throws IOException {
+ put(PROTOCOL_STATUS, null, Writables.getBytes(protocolStatus));
+ }
+
+ public void setReprUrl(String reprUrl) {
+ put(REPR_URL, null, reprUrl);
+ }
+
+ public void setRetriesSinceFetch(int retries) {
+ put(RETRIES, null, retries);
+ }
+
+ public void setScore(float score) {
+ put(SCORE, null, score);
+ }
+
+ public void setSignature(byte[] signature) {
+ put(SIGNATURE, null, signature);
+ }
+
+ public void setStatus(byte status) {
+ put(STATUS, null, new byte[] { status });
+ }
+
+ public void setText(String text) {
+ put(TEXT, null, text);
+ }
+
+ public void setTitle(String title) {
+ put(TITLE, null, title);
+ }
+
+ public Collection<Outlink> getOutlinks() {
+ final List<Outlink> outlinks = new ArrayList<Outlink>();
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> outlinkMap =
+ map.get(OUTLINKS);
+ if (outlinkMap == null) {
+ return outlinks;
+ }
+ for (final Entry<byte[], NavigableMap<Long, ColumnData>> e : outlinkMap.entrySet()) {
+ final String toUrl = Bytes.toString(e.getKey());
+ ColumnData latestColumnData = e.getValue().firstEntry().getValue();
+ final String anchor = Bytes.toString(latestColumnData.getData());
+ outlinks.add(new Outlink(toUrl, anchor));
+ }
+ return outlinks;
+ }
+
+ public void addOutlink(Outlink outlink) {
+ put(OUTLINKS, Bytes.toBytes(outlink.getToUrl()),
+ Bytes.toBytes(outlink.getAnchor()));
+ }
+
+ public void deleteAllOutlinks() {
+ deleteFamily(OUTLINKS);
+ }
+
+ public Collection<Inlink> getInlinks() {
+ final List<Inlink> inlinks = new ArrayList<Inlink>();
+ NavigableMap<byte[], NavigableMap<Long, ColumnData>> inlinkMap =
+ map.get(INLINKS);
+ if (inlinkMap == null) {
+ return inlinks;
+ }
+ for (final Entry<byte[], NavigableMap<Long, ColumnData>> e : inlinkMap.entrySet()) {
+ final String fromUrl = Bytes.toString(e.getKey());
+ ColumnData latestColumnData = e.getValue().firstEntry().getValue();
+ final String anchor = Bytes.toString(latestColumnData.getData());
+ inlinks.add(new Inlink(fromUrl, anchor));
+ }
+
+ return inlinks;
+ }
+
+ public void addInlink(Inlink inlink) {
+ put(INLINKS, Bytes.toBytes(inlink.getFromUrl()),
+ Bytes.toBytes(inlink.getAnchor()));
+ }
+
+ public void deleteAllInlinks() {
+ deleteFamily(INLINKS);
+ }
+
+
+ /** Returns a header.
+ * @param key Header-key
+ * @return headers if it exists, null otherwise
+ */
+ public String getHeader(String key) {
+ return getHeader(Bytes.toBytes(key));
+ }
+
+ /** Returns a header.
+ * @param key Header-key
+ * @return headers if it exists, null otherwise
+ */
+ public String getHeader(byte[] key) {
+ byte[] val = get(HEADERS, key);
+ return val == null ? null : stringify(val);
+ }
+
+ public void addHeader(String key, String value) {
+ put(HEADERS, Bytes.toBytes(key), Bytes.toBytes(value));
+ }
+
+ public void deleteHeaders() {
+ deleteFamily(HEADERS);
+ }
+
+ /** Checks if a metadata key exists in "metadata" column.
+ * @param metaKey Key to search in metadata column
+ * @return true if key exists
+ */
+ public boolean hasMeta(String metaKey) {
+ return hasMeta(Bytes.toBytes(metaKey));
+ }
+
+
+ /** Checks if a metadata key exists in "metadata" column.
+ * @param metaKey Key to search in metadata column
+ * @return true if key exists
+ */
+ public boolean hasMeta(byte[] metaKey) {
+ return hasColumn(METADATA, metaKey);
+ }
+
+ /** Read a metadata key from "metadata" column.
+ * @param metaKey Key to search in metadata column
+ * @return Value in byte array form or null if metadata doesn't exist
+ */
+ public byte[] getMeta(String metaKey) {
+ return getMeta(Bytes.toBytes(metaKey));
+ }
+
+ /** Read a metadata key from "metadata" column.
+ * @param metaKey Key to search in metadata column
+ * @return Value in byte array form or null if metadata doesn't exist
+ */
+ public byte[] getMeta(byte[] metaKey) {
+ return get(METADATA, metaKey);
+ }
+
+ public String getMetaAsString(String metaKey) {
+ final byte[] val = getMeta(metaKey);
+ return val == null ? null : Bytes.toString(val);
+ }
+
+ public void putMeta(String metaKey, byte[] val) {
+ putMeta(Bytes.toBytes(metaKey), val);
+ }
+
+ public void putMeta(byte[] metaKey, byte[] val) {
+ put(METADATA, metaKey, val);
+ }
+
+ public void deleteMeta(String metaKey) {
+ deleteMeta(Bytes.toBytes(metaKey));
+ }
+
+ public void deleteMeta(byte[] metaKey) {
+ delete(METADATA, metaKey);
+ }
+}
Modified: lucene/nutch/branches/nutchbase/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/plugin/build.xml?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/plugin/build.xml (original)
+++ lucene/nutch/branches/nutchbase/src/plugin/build.xml Sun Aug 16 22:25:12 2009
@@ -26,17 +26,17 @@
<!-- Build & deploy all the plugin jars. -->
<!-- ====================================================== -->
<target name="deploy">
- <ant dir="clustering-carrot2" target="deploy"/>
+ <!--<ant dir="clustering-carrot2" target="deploy"/>
<ant dir="creativecommons" target="deploy"/>
- <ant dir="feed" target="deploy"/>
+ <ant dir="feed" target="deploy"/>-->
<ant dir="index-basic" target="deploy"/>
- <ant dir="index-anchor" target="deploy"/>
+ <!--<ant dir="index-anchor" target="deploy"/>
<ant dir="index-more" target="deploy"/>
<ant dir="field-basic" target="deploy"/>
<ant dir="field-boost" target="deploy"/>
- <ant dir="languageidentifier" target="deploy"/>
+ <ant dir="languageidentifier" target="deploy"/>-->
<ant dir="lib-http" target="deploy"/>
- <ant dir="lib-jakarta-poi" target="deploy"/>
+ <!--<ant dir="lib-jakarta-poi" target="deploy"/>
<ant dir="lib-lucene-analyzers" target="deploy"/>
<ant dir="lib-nekohtml" target="deploy"/>
<ant dir="lib-parsems" target="deploy"/>
@@ -46,36 +46,42 @@
<ant dir="nutch-extensionpoints" target="deploy"/>
<ant dir="ontology" target="deploy"/>
<ant dir="protocol-file" target="deploy"/>
- <ant dir="protocol-ftp" target="deploy"/>
+ <ant dir="protocol-ftp" target="deploy"/>-->
<ant dir="protocol-http" target="deploy"/>
- <ant dir="protocol-httpclient" target="deploy"/>
- <ant dir="parse-ext" target="deploy"/>
+ <!--<ant dir="protocol-httpclient" target="deploy"/>
+ <ant dir="parse-ext" target="deploy"/>-->
<ant dir="parse-html" target="deploy"/>
<ant dir="parse-js" target="deploy"/>
<!-- <ant dir="parse-mp3" target="deploy"/> -->
- <ant dir="parse-msexcel" target="deploy"/>
+ <!--<ant dir="parse-msexcel" target="deploy"/>
<ant dir="parse-mspowerpoint" target="deploy"/>
<ant dir="parse-msword" target="deploy"/>
<ant dir="parse-oo" target="deploy"/>
<ant dir="parse-pdf" target="deploy"/>
- <ant dir="parse-rss" target="deploy"/>
+ <ant dir="parse-rss" target="deploy"/>-->
<!-- <ant dir="parse-rtf" target="deploy"/> -->
- <ant dir="parse-swf" target="deploy"/>
+ <!--<ant dir="parse-swf" target="deploy"/>
<ant dir="parse-text" target="deploy"/>
<ant dir="parse-zip" target="deploy"/>
+ -->
<ant dir="query-basic" target="deploy"/>
<ant dir="query-more" target="deploy"/>
<ant dir="query-site" target="deploy"/>
- <ant dir="query-custom" target="deploy"/>
+ <!--<ant dir="query-custom" target="deploy"/>-->
<ant dir="query-url" target="deploy"/>
+ <!--
<ant dir="response-json" target="deploy"/>
<ant dir="response-xml" target="deploy"/>
+ -->
<ant dir="scoring-opic" target="deploy"/>
+ <!--
<ant dir="scoring-link" target="deploy"/>
+ -->
<ant dir="summary-basic" target="deploy"/>
- <ant dir="subcollection" target="deploy"/>
<ant dir="summary-lucene" target="deploy"/>
- <ant dir="tld" target="deploy"/>
+ <!--
+ <ant dir="subcollection" target="deploy"/>
+ <ant dir="tld" target="deploy"/>-->
<ant dir="urlfilter-automaton" target="deploy"/>
<ant dir="urlfilter-domain" target="deploy" />
<ant dir="urlfilter-prefix" target="deploy"/>
@@ -133,6 +139,7 @@
<ant dir="creativecommons" target="clean"/>
<ant dir="feed" target="clean"/>
<ant dir="index-basic" target="clean"/>
+ <ant dir="index-basichbase" target="clean"/>
<ant dir="index-anchor" target="clean"/>
<ant dir="index-more" target="clean"/>
<ant dir="field-basic" target="clean"/>
@@ -140,6 +147,7 @@
<ant dir="languageidentifier" target="clean"/>
<ant dir="lib-commons-httpclient" target="clean"/>
<ant dir="lib-http" target="clean"/>
+ <ant dir="lib-httphbase" target="clean"/>
<ant dir="lib-jakarta-poi" target="clean"/>
<ant dir="lib-lucene-analyzers" target="clean"/>
<ant dir="lib-nekohtml" target="clean"/>
@@ -152,10 +160,13 @@
<ant dir="protocol-file" target="clean"/>
<ant dir="protocol-ftp" target="clean"/>
<ant dir="protocol-http" target="clean"/>
+ <ant dir="protocol-httphbase" target="clean"/>
<ant dir="protocol-httpclient" target="clean"/>
<ant dir="parse-ext" target="clean"/>
<ant dir="parse-html" target="clean"/>
+ <ant dir="parse-htmlhbase" target="clean"/>
<ant dir="parse-js" target="clean"/>
+ <ant dir="parse-jshbase" target="clean"/>
<ant dir="parse-mp3" target="clean"/>
<ant dir="parse-msexcel" target="clean"/>
<ant dir="parse-mspowerpoint" target="clean"/>
Modified: lucene/nutch/branches/nutchbase/src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java (original)
+++ lucene/nutch/branches/nutchbase/src/plugin/index-basic/src/java/org/apache/nutch/indexer/basic/BasicIndexingFilter.java Sun Aug 16 22:25:12 2009
@@ -22,20 +22,21 @@
import org.apache.lucene.document.DateTools;
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.parse.Parse;
-
-import org.apache.nutch.indexer.IndexingFilter;
+import org.apache.nutch.crawl.Inlink;
import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.lucene.LuceneWriter;
-import org.apache.hadoop.io.Text;
-
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.WebTableRow;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Collection;
+import java.util.HashSet;
+
import org.apache.hadoop.conf.Configuration;
/** Adds basic searchable fields to a document. */
@@ -44,21 +45,30 @@
private int MAX_TITLE_LENGTH;
private Configuration conf;
+
+ private static final Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+ static {
+ COLUMNS.add(new HbaseColumn(WebTableColumns.TITLE));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.TEXT));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_TIME));
+ COLUMNS.add(new HbaseColumn(WebTableColumns.INLINKS));
+ }
- public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+ public NutchDocument filter(NutchDocument doc, String url, WebTableRow row)
throws IndexingException {
- Text reprUrl = (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
- String reprUrlString = reprUrl != null ? reprUrl.toString() : null;
- String urlString = url.toString();
+ String reprUrl = null;
+ if (row.hasColumn(WebTableColumns.REPR_URL, null))
+ reprUrl = row.getReprUrl();
String host = null;
try {
URL u;
- if (reprUrlString != null) {
- u = new URL(reprUrlString);
+ if (reprUrl!= null) {
+ u = new URL(reprUrl);
} else {
- u = new URL(urlString);
+ u = new URL(url);
}
host = u.getHost();
} catch (MalformedURLException e) {
@@ -66,34 +76,48 @@
}
if (host != null) {
+ // add host as un-stored, indexed and tokenized
doc.add("host", host);
+ // add site as un-stored, indexed and un-tokenized
doc.add("site", host);
}
- doc.add("url", reprUrlString == null ? urlString : reprUrlString);
- doc.add("content", parse.getText());
+ // url is both stored and indexed, so it's both searchable and returned
+ doc.add("url", reprUrl == null ? url : reprUrl);
+
+ if (reprUrl != null) {
+ // also store original url as both stored and indexes
+ doc.add("orig", url);
+ }
+
+ // content is indexed, so that it's searchable, but not stored in index
+ doc.add("content", row.getText());
// title
- String title = parse.getData().getTitle();
+ String title = row.getTitle();
if (title.length() > MAX_TITLE_LENGTH) { // truncate title if needed
title = title.substring(0, MAX_TITLE_LENGTH);
}
+ // add title indexed and stored so that it can be displayed
doc.add("title", title);
-
// add cached content/summary display policy, if available
- String caching = parse.getData().getMeta(Nutch.CACHING_FORBIDDEN_KEY);
- if (caching != null && !caching.equals(Nutch.CACHING_FORBIDDEN_NONE)) {
+ String caching = row.getMetaAsString(Nutch.CACHING_FORBIDDEN_KEY);
+ if (caching != null && !caching.equals(Nutch.CACHING_FORBIDDEN_NONE)) {
doc.add("cache", caching);
}
// add timestamp when fetched, for deduplication
doc.add("tstamp",
- DateTools.timeToString(datum.getFetchTime(),
- DateTools.Resolution.MILLISECOND));
-
+ DateTools.timeToString(row.getFetchTime(), DateTools.Resolution.MILLISECOND));
+
+ // TODO: move anchors to its own plugin
+ for (Inlink inlink : row.getInlinks()) {
+ doc.add("anchor", inlink.getAnchor());
+ }
+
return doc;
}
-
+
public void addIndexBackendOptions(Configuration conf) {
///////////////////////////
@@ -137,4 +161,8 @@
return this.conf;
}
+ public Collection<HbaseColumn> getColumns() {
+ return COLUMNS;
+ }
+
}
Modified: lucene/nutch/branches/nutchbase/src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/BlockedException.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/BlockedException.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/BlockedException.java (original)
+++ lucene/nutch/branches/nutchbase/src/plugin/lib-http/src/java/org/apache/nutch/protocol/http/api/BlockedException.java Sun Aug 16 22:25:12 2009
@@ -17,6 +17,7 @@
package org.apache.nutch.protocol.http.api;
+@SuppressWarnings("serial")
public class BlockedException extends HttpException {
public BlockedException(String msg) {