You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2010/06/30 12:36:29 UTC
svn commit: r959259 [6/12] - in /nutch/branches/nutchbase: ./ bin/ conf/
contrib/ docs/ ivy/ lib/ lib/jetty-ext/ src/engines/ src/gora/ src/java/
src/java/org/apache/nutch/analysis/ src/java/org/apache/nutch/clustering/
src/java/org/apache/nutch/crawl/...
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/parse/ParserJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/parse/ParserJob.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/parse/ParserJob.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/parse/ParserJob.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,185 @@
+package org.apache.nutch.parse;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.GeneratorJob;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.crawl.URLWebPage;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.ParseStatus;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.IdentityPageReducer;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+
+public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(ParserJob.class);
+
+ private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+ private Configuration conf;
+
+ static {
+ FIELDS.add(WebPage.Field.STATUS);
+ FIELDS.add(WebPage.Field.CONTENT);
+ FIELDS.add(WebPage.Field.CONTENT_TYPE);
+ FIELDS.add(WebPage.Field.SIGNATURE);
+ FIELDS.add(WebPage.Field.MARKERS);
+ FIELDS.add(WebPage.Field.OUTLINKS);
+ FIELDS.add(WebPage.Field.METADATA);
+ }
+
+ private ParseUtil parseUtil;
+
+ private boolean shouldContinue;
+
+ private Utf8 crawlId;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ parseUtil = new ParseUtil(conf);
+ shouldContinue = conf.getBoolean("job.continue", false);
+ crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, Nutch.ALL_CRAWL_ID_STR));
+ }
+
+ @Override
+ public void map(String key, WebPage page, Context context)
+ throws IOException, InterruptedException {
+ Utf8 mark = Mark.FETCH_MARK.checkMark(page);
+ if (!NutchJob.shouldProcess(mark, crawlId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different crawl id");
+ }
+ return;
+ }
+ if (shouldContinue && Mark.PARSE_MARK.checkMark(page) != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
+ }
+ return;
+ }
+
+ URLWebPage redirectedPage = parseUtil.process(key, page);
+ ParseStatus pstatus = page.getParseStatus();
+ if (pstatus != null) {
+ context.getCounter("ParserStatus",
+ ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1);
+ }
+
+ if (redirectedPage != null) {
+ context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
+ redirectedPage.getDatum());
+ }
+ context.write(key, page);
+ }
+
+ public Collection<WebPage.Field> getFields(Job job) {
+ Configuration conf = job.getConfiguration();
+ Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
+ ParserFactory parserFactory = new ParserFactory(conf);
+ HtmlParseFilters parseFilters = new HtmlParseFilters(conf);
+
+ Collection<WebPage.Field> parsePluginFields = parserFactory.getFields();
+ Collection<WebPage.Field> signaturePluginFields =
+ SignatureFactory.getFields(conf);
+ Collection<WebPage.Field> htmlParsePluginFields = parseFilters.getFields();
+
+ if (parsePluginFields != null) {
+ fields.addAll(parsePluginFields);
+ }
+ if (signaturePluginFields != null) {
+ fields.addAll(signaturePluginFields);
+ }
+ if (htmlParsePluginFields != null) {
+ fields.addAll(htmlParsePluginFields);
+ }
+
+ return fields;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public int parse(String crawlId, boolean shouldContinue) throws Exception {
+ LOG.info("ParserJob: starting");
+
+ getConf().set(GeneratorJob.CRAWL_ID, crawlId);
+ getConf().setBoolean("job.continue", shouldContinue);
+
+ LOG.info("ParserJob: continuing: " + getConf().getBoolean("job.continue", false));
+ if (crawlId.equals(Nutch.ALL_CRAWL_ID_STR)) {
+ LOG.info("ParserJob: parsing all");
+ } else {
+ LOG.info("ParserJob: crawlId: " + crawlId);
+ }
+
+ final Job job = new NutchJob(getConf(), "parse");
+
+ Collection<WebPage.Field> fields = getFields(job);
+ StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
+ ParserJob.class);
+ StorageUtils.initReducerJob(job, IdentityPageReducer.class);
+ job.setNumReduceTasks(0);
+ boolean success = job.waitForCompletion(true);
+ if (!success){
+ LOG.info("ParserJob: failed");
+ return -1;
+ }
+ LOG.info("ParserJob: success");
+ return 0;
+ }
+
+ public int run(String[] args) throws Exception {
+ boolean shouldContinue = false;
+ String crawlId;
+
+ String usage = "Usage: ParserJob (<crawl id> | -all) [-continue]";
+
+ if (args.length < 1) {
+ System.err.println(usage);
+ return 1;
+ }
+
+ crawlId = args[0];
+ if (crawlId.equals("-continue")) {
+ System.err.println(usage);
+ return 1;
+ }
+
+ if (args.length >= 1 && "-continue".equals(args[0])) {
+ shouldContinue = true;
+ }
+
+ return parse(crawlId, shouldContinue);
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new ParserJob(), args);
+ System.exit(res);
+ }
+
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/ExtensionPoint.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/ExtensionPoint.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/ExtensionPoint.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/ExtensionPoint.java Wed Jun 30 10:36:20 2010
@@ -101,7 +101,7 @@ public class ExtensionPoint {
}
/**
- * Install a coresponding extension to this extension point.
+ * Install a corresponding extension to this extension point.
*
* @param extension
*/
@@ -110,7 +110,7 @@ public class ExtensionPoint {
}
/**
- * Returns a array of extensions that lsiten to this extension point
+ * Returns a array of extensions that listen to this extension point
*
* @return Extension[]
*/
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/FieldPluggable.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/FieldPluggable.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/FieldPluggable.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/FieldPluggable.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,10 @@
+package org.apache.nutch.plugin;
+
+import java.util.Collection;
+
+import org.apache.nutch.storage.WebPage;
+
+public interface FieldPluggable extends Pluggable {
+ public Collection<WebPage.Field> getFields();
+
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/Pluggable.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/Pluggable.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/Pluggable.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/plugin/Pluggable.java Wed Jun 30 10:36:20 2010
@@ -1,16 +1,4 @@
package org.apache.nutch.plugin;
-
-/**
- * Defines the capability of a class to be plugged into Nutch.
- * This is a common interface that must be implemented by all
- * Nutch Extension Points.
- *
- * @author Jérôme Charron
- *
- * @see <a href="http://wiki.apache.org/nutch/AboutPlugins">About Plugins</a>
- * @see <a href="package-summary.html#package_description">
- * plugin package description</a>
- */
public interface Pluggable {
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/Protocol.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/Protocol.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/Protocol.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/Protocol.java Wed Jun 30 10:36:20 2010
@@ -20,19 +20,14 @@ package org.apache.nutch.protocol;
// Hadoop imports
import org.apache.hadoop.conf.Configurable;
-
-// Nutch imports
-import org.apache.nutch.plugin.TablePluggable;
-import org.apache.nutch.protocol.ProtocolOutput;
-import org.apache.nutch.protocol.RobotRules;
-import org.apache.nutch.util.hbase.WebTableRow;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.plugin.FieldPluggable;
+import org.apache.nutch.storage.WebPage;
/** A retriever of url content. Implemented by protocol extensions. */
-public interface Protocol extends TablePluggable, Configurable {
+public interface Protocol extends FieldPluggable, Configurable {
/** The name of the extension point. */
public final static String X_POINT_ID = Protocol.class.getName();
-
+
/**
* Property name. If in the current configuration this property is set to
* true, protocol implementations should handle "politeness" limits
@@ -53,13 +48,13 @@ public interface Protocol extends TableP
/** Returns the {@link Content} for a fetchlist entry.
*/
- ProtocolOutput getProtocolOutput(String url, WebTableRow row);
+ ProtocolOutput getProtocolOutput(String url, WebPage page);
/**
* Retrieve robot rules applicable for this url.
* @param url url to check
- * @param row Row
+ * @param page
* @return robot rules (specific for this url or default), never null
*/
- RobotRules getRobotRules(String url, WebTableRow row);
+ RobotRules getRobotRules(String url, WebPage page);
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolFactory.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolFactory.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolFactory.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolFactory.java Wed Jun 30 10:36:20 2010
@@ -17,21 +17,20 @@
package org.apache.nutch.protocol;
-import java.net.URL;
import java.net.MalformedURLException;
+import java.net.URL;
import java.util.Collection;
import java.util.HashSet;
-// Commons Logging imports
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.nutch.plugin.*;
-import org.apache.nutch.protocol.ProtocolNotFound;
-import org.apache.nutch.util.ObjectCache;
-import org.apache.nutch.util.hbase.HbaseColumn;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.ObjectCache;
/**
* Creates and caches {@link Protocol} plugins. Protocol plugins should define
@@ -44,9 +43,9 @@ public class ProtocolFactory {
public static final Log LOG = LogFactory.getLog(ProtocolFactory.class);
- private ExtensionPoint extensionPoint;
+ private final ExtensionPoint extensionPoint;
- private Configuration conf;
+ private final Configuration conf;
public ProtocolFactory(Configuration conf) {
this.conf = conf;
@@ -60,7 +59,7 @@ public class ProtocolFactory {
/**
* Returns the appropriate {@link Protocol} implementation for a url.
- *
+ *
* @param urlString
* Url String
* @return The appropriate {@link Protocol} implementation for a given {@link URL}.
@@ -109,7 +108,7 @@ public class ProtocolFactory {
}
return null;
}
-
+
boolean contains(String what, String where){
String parts[]=where.split("[, ]");
for(int i=0;i<parts.length;i++) {
@@ -117,19 +116,22 @@ public class ProtocolFactory {
}
return false;
}
-
- public Collection<HbaseColumn> getColumnSet() {
- Collection<HbaseColumn> columns = new HashSet<HbaseColumn>();
+
+ public Collection<WebPage.Field> getFields() {
+ Collection<WebPage.Field> fields = new HashSet<WebPage.Field>();
for (Extension extension : this.extensionPoint.getExtensions()) {
Protocol protocol;
try {
protocol = (Protocol) extension.getExtensionInstance();
- columns.addAll(protocol.getColumns());
+ Collection<WebPage.Field> pluginFields = protocol.getFields();
+ if (pluginFields != null) {
+ fields.addAll(pluginFields);
+ }
} catch (PluginRuntimeException e) {
// ignore
}
}
- return columns;
+ return fields;
}
-
+
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolOutput.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolOutput.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolOutput.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolOutput.java Wed Jun 30 10:36:20 2010
@@ -17,6 +17,8 @@
package org.apache.nutch.protocol;
+import org.apache.nutch.storage.ProtocolStatusUtils;
+
/**
* Simple aggregate to pass from protocol plugins both content and
* protocol status.
@@ -24,18 +26,19 @@ package org.apache.nutch.protocol;
*/
public class ProtocolOutput {
private Content content;
- private ProtocolStatus status;
+ private org.apache.nutch.storage.ProtocolStatus status;
- public ProtocolOutput(Content content, ProtocolStatus status) {
+ public ProtocolOutput(Content content,
+ org.apache.nutch.storage.ProtocolStatus status) {
this.content = content;
this.status = status;
}
-
+
public ProtocolOutput(Content content) {
this.content = content;
- this.status = ProtocolStatus.STATUS_SUCCESS;
+ this.status = ProtocolStatusUtils.STATUS_SUCCESS;
}
-
+
public Content getContent() {
return content;
}
@@ -44,11 +47,11 @@ public class ProtocolOutput {
this.content = content;
}
- public ProtocolStatus getStatus() {
+ public org.apache.nutch.storage.ProtocolStatus getStatus() {
return status;
}
- public void setStatus(ProtocolStatus status) {
+ public void setStatus(org.apache.nutch.storage.ProtocolStatus status) {
this.status = status;
}
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatus.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatus.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatus.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatus.java Wed Jun 30 10:36:20 2010
@@ -29,46 +29,11 @@ import org.apache.hadoop.io.WritableUtil
/**
* @author Andrzej Bialecki
*/
-public class ProtocolStatus implements Writable {
-
+@Deprecated
+public class ProtocolStatus implements Writable, ProtocolStatusCodes {
+
private final static byte VERSION = 2;
-
- /** Content was retrieved without errors. */
- public static final int SUCCESS = 1;
- /** Content was not retrieved. Any further errors may be indicated in args. */
- public static final int FAILED = 2;
-
- /** This protocol was not found. Application may attempt to retry later. */
- public static final int PROTO_NOT_FOUND = 10;
- /** Resource is gone. */
- public static final int GONE = 11;
- /** Resource has moved permanently. New url should be found in args. */
- public static final int MOVED = 12;
- /** Resource has moved temporarily. New url should be found in args. */
- public static final int TEMP_MOVED = 13;
- /** Resource was not found. */
- public static final int NOTFOUND = 14;
- /** Temporary failure. Application may retry immediately. */
- public static final int RETRY = 15;
- /** Unspecified exception occured. Further information may be provided in args. */
- public static final int EXCEPTION = 16;
- /** Access denied - authorization required, but missing/incorrect. */
- public static final int ACCESS_DENIED = 17;
- /** Access denied by robots.txt rules. */
- public static final int ROBOTS_DENIED = 18;
- /** Too many redirects. */
- public static final int REDIR_EXCEEDED = 19;
- /** Not fetching. */
- public static final int NOTFETCHING = 20;
- /** Unchanged since the last fetch. */
- public static final int NOTMODIFIED = 21;
- /** Request was refused by protocol plugins, because it would block.
- * The expected number of milliseconds to wait before retry may be provided
- * in args. */
- public static final int WOULDBLOCK = 22;
- /** Thread was blocked http.max.delays times during fetching. */
- public static final int BLOCKED = 23;
-
+
// Useful static instances for status codes that don't usually require any
// additional arguments.
public static final ProtocolStatus STATUS_SUCCESS = new ProtocolStatus(SUCCESS);
@@ -82,11 +47,11 @@ public class ProtocolStatus implements W
public static final ProtocolStatus STATUS_NOTMODIFIED = new ProtocolStatus(NOTMODIFIED);
public static final ProtocolStatus STATUS_WOULDBLOCK = new ProtocolStatus(WOULDBLOCK);
public static final ProtocolStatus STATUS_BLOCKED = new ProtocolStatus(BLOCKED);
-
+
private int code;
private long lastModified;
private String[] args;
-
+
private static final HashMap<Integer, String> codeToName =
new HashMap<Integer, String>();
static {
@@ -107,40 +72,40 @@ public class ProtocolStatus implements W
codeToName.put(new Integer(WOULDBLOCK), "wouldblock");
codeToName.put(new Integer(BLOCKED), "blocked");
}
-
+
public ProtocolStatus() {
-
+
}
public ProtocolStatus(int code, String[] args) {
this.code = code;
this.args = args;
}
-
+
public ProtocolStatus(int code, String[] args, long lastModified) {
this.code = code;
this.args = args;
this.lastModified = lastModified;
}
-
+
public ProtocolStatus(int code) {
this(code, null);
}
-
+
public ProtocolStatus(int code, long lastModified) {
this(code, null, lastModified);
}
-
+
public ProtocolStatus(int code, Object message) {
this(code, message, 0L);
}
-
+
public ProtocolStatus(int code, Object message, long lastModified) {
this.code = code;
this.lastModified = lastModified;
if (message != null) this.args = new String[]{String.valueOf(message)};
}
-
+
public ProtocolStatus(Throwable t) {
this(EXCEPTION, t);
}
@@ -150,7 +115,7 @@ public class ProtocolStatus implements W
res.readFields(in);
return res;
}
-
+
public void readFields(DataInput in) throws IOException {
byte version = in.readByte();
switch(version) {
@@ -168,7 +133,7 @@ public class ProtocolStatus implements W
throw new VersionMismatchException(VERSION, version);
}
}
-
+
public void write(DataOutput out) throws IOException {
out.writeByte(VERSION);
out.writeByte((byte)code);
@@ -183,7 +148,7 @@ public class ProtocolStatus implements W
public void setArgs(String[] args) {
this.args = args;
}
-
+
public String[] getArgs() {
return args;
}
@@ -191,53 +156,54 @@ public class ProtocolStatus implements W
public int getCode() {
return code;
}
-
+
public void setCode(int code) {
this.code = code;
}
-
+
public boolean isSuccess() {
- return code == SUCCESS;
+ return code == SUCCESS;
}
-
+
public boolean isTransientFailure() {
return
- code == ACCESS_DENIED ||
- code == EXCEPTION ||
- code == REDIR_EXCEEDED ||
- code == RETRY ||
- code == TEMP_MOVED ||
- code == WOULDBLOCK ||
- code == PROTO_NOT_FOUND;
+ code == ACCESS_DENIED ||
+ code == EXCEPTION ||
+ code == REDIR_EXCEEDED ||
+ code == RETRY ||
+ code == TEMP_MOVED ||
+ code == WOULDBLOCK ||
+ code == PROTO_NOT_FOUND;
}
-
+
public boolean isPermanentFailure() {
return
- code == FAILED ||
- code == GONE ||
- code == MOVED ||
- code == NOTFOUND ||
- code == ROBOTS_DENIED;
+ code == FAILED ||
+ code == GONE ||
+ code == MOVED ||
+ code == NOTFOUND ||
+ code == ROBOTS_DENIED;
}
-
+
public String getMessage() {
if (args != null && args.length > 0) return args[0];
return null;
}
-
+
public void setMessage(String msg) {
if (args != null && args.length > 0) args[0] = msg;
else args = new String[] {msg};
}
-
+
public long getLastModified() {
return lastModified;
}
-
+
public void setLastModified(long lastModified) {
this.lastModified = lastModified;
}
-
+
+ @Override
public boolean equals(Object o) {
if (o == null) return false;
if (!(o instanceof ProtocolStatus)) return false;
@@ -255,7 +221,8 @@ public class ProtocolStatus implements W
}
return true;
}
-
+
+ @Override
public String toString() {
StringBuffer res = new StringBuffer();
res.append(codeToName.get(new Integer(code)) + "(" + code + "), lastModified=" + lastModified);
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatusCodes.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatusCodes.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatusCodes.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/protocol/ProtocolStatusCodes.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,40 @@
+package org.apache.nutch.protocol;
+
+public interface ProtocolStatusCodes {
+
+ /** Content was retrieved without errors. */
+ public static final int SUCCESS = 1;
+ /** Content was not retrieved. Any further errors may be indicated in args. */
+ public static final int FAILED = 2;
+
+ /** This protocol was not found. Application may attempt to retry later. */
+ public static final int PROTO_NOT_FOUND = 10;
+ /** Resource is gone. */
+ public static final int GONE = 11;
+ /** Resource has moved permanently. New url should be found in args. */
+ public static final int MOVED = 12;
+ /** Resource has moved temporarily. New url should be found in args. */
+ public static final int TEMP_MOVED = 13;
+ /** Resource was not found. */
+ public static final int NOTFOUND = 14;
+ /** Temporary failure. Application may retry immediately. */
+ public static final int RETRY = 15;
+ /** Unspecified exception occured. Further information may be provided in args. */
+ public static final int EXCEPTION = 16;
+ /** Access denied - authorization required, but missing/incorrect. */
+ public static final int ACCESS_DENIED = 17;
+ /** Access denied by robots.txt rules. */
+ public static final int ROBOTS_DENIED = 18;
+ /** Too many redirects. */
+ public static final int REDIR_EXCEEDED = 19;
+ /** Not fetching. */
+ public static final int NOTFETCHING = 20;
+ /** Unchanged since the last fetch. */
+ public static final int NOTMODIFIED = 21;
+ /** Request was refused by protocol plugins, because it would block.
+ * The expected number of milliseconds to wait before retry may be provided
+ * in args. */
+ public static final int WOULDBLOCK = 22;
+ /** Thread was blocked http.max.delays times during fetching. */
+ public static final int BLOCKED = 23;
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilter.java Wed Jun 30 10:36:20 2010
@@ -21,44 +21,44 @@ import java.util.List;
import org.apache.hadoop.conf.Configurable;
import org.apache.nutch.indexer.NutchDocument;
-import org.apache.nutch.plugin.TablePluggable;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.plugin.FieldPluggable;
+import org.apache.nutch.storage.WebPage;
/**
* A contract defining behavior of scoring plugins.
- *
+ *
* A scoring filter will manipulate scoring variables in CrawlDatum and
* in resulting search indexes. Filters can be chained in a specific order,
* to provide multi-stage scoring adjustments.
- *
+ *
* @author Andrzej Bialecki
*/
-public interface ScoringFilter extends Configurable, TablePluggable {
+public interface ScoringFilter extends Configurable, FieldPluggable {
/** The name of the extension point. */
public final static String X_POINT_ID = ScoringFilter.class.getName();
-
+
/**
* Set an initial score for newly injected pages. Note: newly injected pages
- * may have no inlinks, so filter implementations may wish to set this
+ * may have no inlinks, so filter implementations may wish to set this
* score to a non-zero value, to give newly injected pages some initial
* credit.
* @param url url of the page
- * @param row new row. Filters will modify it in-place.
+ * @param page new page. Filters will modify it in-place.
* @throws ScoringFilterException
*/
- public void injectedScore(String url, WebTableRow row) throws ScoringFilterException;
-
+ public void injectedScore(String url, WebPage page) throws ScoringFilterException;
+
/**
* Set an initial score for newly discovered pages. Note: newly discovered pages
* have at least one inlink with its score contribution, so filter implementations
* may choose to set initial score to zero (unknown value), and then the inlink
* score contribution will set the "real" value of the new page.
* @param url url of the page
- * @param row page row. Modifications will be persisted.
+ * @param page
* @throws ScoringFilterException
*/
- public void initialScore(String url, WebTableRow row) throws ScoringFilterException;
-
+ public void initialScore(String url, WebPage page) throws ScoringFilterException;
+
/**
* This method prepares a sort value for the purpose of sorting and
* selecting top N scoring pages during fetchlist generation.
@@ -66,7 +66,7 @@ public interface ScoringFilter extends C
* @param datum page row. Modifications will be persisted.
* @param initSort initial sort value, or a value from previous filters in chain
*/
- public float generatorSortValue(String url, WebTableRow row, float initSort) throws ScoringFilterException;
+ public float generatorSortValue(String url, WebPage page, float initSort) throws ScoringFilterException;
/**
* Distribute score value from the current page to all its outlinked pages.
@@ -74,26 +74,26 @@ public interface ScoringFilter extends C
* @param row page row
* @param scoreData A list of {@link OutlinkedScoreDatum}s for every outlink.
* These {@link OutlinkedScoreDatum}s will be passed to
- * {@link #updateScore(String, WebTableRow, List)}
+ * {@link #updateScore(String, OldWebTableRow, List)}
* for every outlinked URL.
* @param allCount number of all collected outlinks from the source page
* @throws ScoringFilterException
*/
public void distributeScoreToOutlinks(String fromUrl,
- WebTableRow row, Collection<ScoreDatum> scoreData,
+ WebPage page, Collection<ScoreDatum> scoreData,
int allCount) throws ScoringFilterException;
/**
* This method calculates a new score during table update, based on the values contributed
* by inlinked pages.
* @param url url of the page
- * @param row page row
+ * @param page
* @param inlinked list of {@link OutlinkedScoreDatum}s for all inlinks pointing to this URL.
* @throws ScoringFilterException
*/
- public void updateScore(String url, WebTableRow row, List<ScoreDatum> inlinkedScoreData)
+ public void updateScore(String url, WebPage page, List<ScoreDatum> inlinkedScoreData)
throws ScoringFilterException;
-
+
/**
* This method calculates a Lucene document boost.
* @param url url of the page
@@ -107,6 +107,6 @@ public interface ScoringFilter extends C
* other scoring strategies by modifying Lucene document directly.
* @throws ScoringFilterException
*/
- public float indexerScore(String url, NutchDocument doc, WebTableRow row, float initScore)
+ public float indexerScore(String url, NutchDocument doc, WebPage page, float initScore)
throws ScoringFilterException;
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/scoring/ScoringFilters.java Wed Jun 30 10:36:20 2010
@@ -30,13 +30,12 @@ import org.apache.nutch.plugin.Extension
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.ObjectCache;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableRow;
/**
* Creates and caches {@link ScoringFilter} implementing plugins.
- *
+ *
* @author Andrzej Bialecki
*/
public class ScoringFilters extends Configured implements ScoringFilter {
@@ -86,7 +85,7 @@ public class ScoringFilters extends Conf
/** Calculate a sort value for Generate. */
@Override
- public float generatorSortValue(String url, WebTableRow row, float initSort)
+ public float generatorSortValue(String url, WebPage row, float initSort)
throws ScoringFilterException {
for (ScoringFilter filter : filters) {
initSort = filter.generatorSortValue(url, row, initSort);
@@ -96,7 +95,7 @@ public class ScoringFilters extends Conf
/** Calculate a new initial score, used when adding newly discovered pages. */
@Override
- public void initialScore(String url, WebTableRow row) throws ScoringFilterException {
+ public void initialScore(String url, WebPage row) throws ScoringFilterException {
for (ScoringFilter filter : filters) {
filter.initialScore(url, row);
}
@@ -104,31 +103,31 @@ public class ScoringFilters extends Conf
/** Calculate a new initial score, used when injecting new pages. */
@Override
- public void injectedScore(String url, WebTableRow row) throws ScoringFilterException {
+ public void injectedScore(String url, WebPage row) throws ScoringFilterException {
for (ScoringFilter filter : filters) {
filter.injectedScore(url, row);
}
}
@Override
- public void distributeScoreToOutlinks(String fromUrl, WebTableRow row,
+ public void distributeScoreToOutlinks(String fromUrl, WebPage row,
Collection<ScoreDatum> scoreData, int allCount)
throws ScoringFilterException {
for (ScoringFilter filter : filters) {
filter.distributeScoreToOutlinks(fromUrl, row, scoreData, allCount);
}
}
-
+
@Override
- public void updateScore(String url, WebTableRow row,
+ public void updateScore(String url, WebPage row,
List<ScoreDatum> inlinkedScoreData) throws ScoringFilterException {
for (ScoringFilter filter : filters) {
filter.updateScore(url, row, inlinkedScoreData);
- }
+ }
}
@Override
- public float indexerScore(String url, NutchDocument doc, WebTableRow row,
+ public float indexerScore(String url, NutchDocument doc, WebPage row,
float initScore) throws ScoringFilterException {
for (ScoringFilter filter : filters) {
initScore = filter.indexerScore(url, doc, row, initScore);
@@ -137,11 +136,14 @@ public class ScoringFilters extends Conf
}
@Override
- public Collection<HbaseColumn> getColumns() {
- Set<HbaseColumn> columns = new HashSet<HbaseColumn>();
+ public Collection<WebPage.Field> getFields() {
+ Set<WebPage.Field> fields = new HashSet<WebPage.Field>();
for (ScoringFilter filter : filters) {
- columns.addAll(filter.getColumns());
+ Collection<WebPage.Field> pluginFields = filter.getFields();
+ if (pluginFields != null) {
+ fields.addAll(pluginFields);
+ }
}
- return columns;
+ return fields;
}
}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/Mark.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/Mark.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/Mark.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/Mark.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,30 @@
+package org.apache.nutch.storage;
+
+import org.apache.avro.util.Utf8;
+
+public enum Mark {
+ INJECT_MARK("_injmrk_"), GENERATE_MARK("_gnmrk_"), FETCH_MARK("_ftcmrk_"),
+ PARSE_MARK("__prsmrk__"), UPDATEDB_MARK("_updmrk_"), INDEX_MARK("_idxmrk_");
+
+ private Utf8 name;
+
+ Mark(String name) {
+ this.name = new Utf8(name);
+ }
+
+ public void putMark(WebPage page, Utf8 markValue) {
+ page.putToMarkers(name, markValue);
+ }
+
+ public void putMark(WebPage page, String markValue) {
+ putMark(page, new Utf8(markValue));
+ }
+
+ public Utf8 removeMark(WebPage page) {
+ return page.removeFromMarkers(name);
+ }
+
+ public Utf8 checkMark(WebPage page) {
+ return page.getFromMarkers(name);
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ParseStatus.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ParseStatus.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ParseStatus.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ParseStatus.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,93 @@
+package org.apache.nutch.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class ParseStatus extends PersistentBase {
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"ParseStatus\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"majorCode\",\"type\":\"int\"},{\"name\":\"minorCode\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}");
+ public static enum Field {
+ MAJOR_CODE(0,"majorCode"),
+ MINOR_CODE(1,"minorCode"),
+ ARGS(2,"args"),
+ ;
+ private int index;
+ private String name;
+ Field(int index, String name) {this.index=index;this.name=name;}
+ public int getIndex() {return index;}
+ public String getName() {return name;}
+ public String toString() {return name;}
+ };
+ public static final String[] _ALL_FIELDS = {"majorCode","minorCode","args",};
+ static {
+ PersistentBase.registerFields(ParseStatus.class, _ALL_FIELDS);
+ }
+ private int majorCode;
+ private int minorCode;
+ private GenericArray<Utf8> args;
+ public ParseStatus() {
+ this(new StateManagerImpl());
+ }
+ public ParseStatus(StateManager stateManager) {
+ super(stateManager);
+ args = new ListGenericArray<Utf8>(getSchema().getField("args").schema());
+ }
+ public ParseStatus newInstance(StateManager stateManager) {
+ return new ParseStatus(stateManager);
+ }
+ public Schema getSchema() { return _SCHEMA; }
+ public Object get(int _field) {
+ switch (_field) {
+ case 0: return majorCode;
+ case 1: return minorCode;
+ case 2: return args;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int _field, Object _value) {
+ getStateManager().setDirty(this, _field);
+ switch (_field) {
+ case 0:majorCode = (Integer)_value; break;
+ case 1:minorCode = (Integer)_value; break;
+ case 2:args = (GenericArray<Utf8>)_value; break;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ public int getMajorCode() {
+ return (Integer) get(0);
+ }
+ public void setMajorCode(int value) {
+ put(0, value);
+ }
+ public int getMinorCode() {
+ return (Integer) get(1);
+ }
+ public void setMinorCode(int value) {
+ put(1, value);
+ }
+ public GenericArray<Utf8> getArgs() {
+ return (GenericArray<Utf8>) get(2);
+ }
+ public void addToArgs(Utf8 element) {
+ getStateManager().setDirty(this, 2);
+ args.add(element);
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatus.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatus.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatus.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatus.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,93 @@
+package org.apache.nutch.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class ProtocolStatus extends PersistentBase {
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"ProtocolStatus\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"code\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"lastModified\",\"type\":\"long\"}]}");
+ public static enum Field {
+ CODE(0,"code"),
+ ARGS(1,"args"),
+ LAST_MODIFIED(2,"lastModified"),
+ ;
+ private int index;
+ private String name;
+ Field(int index, String name) {this.index=index;this.name=name;}
+ public int getIndex() {return index;}
+ public String getName() {return name;}
+ public String toString() {return name;}
+ };
+ public static final String[] _ALL_FIELDS = {"code","args","lastModified",};
+ static {
+ PersistentBase.registerFields(ProtocolStatus.class, _ALL_FIELDS);
+ }
+ private int code;
+ private GenericArray<Utf8> args;
+ private long lastModified;
+ public ProtocolStatus() {
+ this(new StateManagerImpl());
+ }
+ public ProtocolStatus(StateManager stateManager) {
+ super(stateManager);
+ args = new ListGenericArray<Utf8>(getSchema().getField("args").schema());
+ }
+ public ProtocolStatus newInstance(StateManager stateManager) {
+ return new ProtocolStatus(stateManager);
+ }
+ public Schema getSchema() { return _SCHEMA; }
+ public Object get(int _field) {
+ switch (_field) {
+ case 0: return code;
+ case 1: return args;
+ case 2: return lastModified;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int _field, Object _value) {
+ getStateManager().setDirty(this, _field);
+ switch (_field) {
+ case 0:code = (Integer)_value; break;
+ case 1:args = (GenericArray<Utf8>)_value; break;
+ case 2:lastModified = (Long)_value; break;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ public int getCode() {
+ return (Integer) get(0);
+ }
+ public void setCode(int value) {
+ put(0, value);
+ }
+ public GenericArray<Utf8> getArgs() {
+ return (GenericArray<Utf8>) get(1);
+ }
+ public void addToArgs(Utf8 element) {
+ getStateManager().setDirty(this, 1);
+ args.add(element);
+ }
+ public long getLastModified() {
+ return (Long) get(2);
+ }
+ public void setLastModified(long value) {
+ put(2, value);
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatusUtils.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatusUtils.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatusUtils.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/ProtocolStatusUtils.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,50 @@
+package org.apache.nutch.storage;
+
+import java.net.URL;
+
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.util.Utf8;
+import org.apache.nutch.protocol.ProtocolStatusCodes;
+import org.apache.nutch.util.TableUtil;
+
+public class ProtocolStatusUtils implements ProtocolStatusCodes {
+ // Useful static instances for status codes that don't usually require any
+ // additional arguments.
+ public static final ProtocolStatus STATUS_SUCCESS = makeStatus(SUCCESS);
+ public static final ProtocolStatus STATUS_FAILED = makeStatus(FAILED);
+ public static final ProtocolStatus STATUS_GONE = makeStatus(GONE);
+ public static final ProtocolStatus STATUS_NOTFOUND = makeStatus(NOTFOUND);
+ public static final ProtocolStatus STATUS_RETRY = makeStatus(RETRY);
+ public static final ProtocolStatus STATUS_ROBOTS_DENIED = makeStatus(ROBOTS_DENIED);
+ public static final ProtocolStatus STATUS_REDIR_EXCEEDED = makeStatus(REDIR_EXCEEDED);
+ public static final ProtocolStatus STATUS_NOTFETCHING = makeStatus(NOTFETCHING);
+ public static final ProtocolStatus STATUS_NOTMODIFIED = makeStatus(NOTMODIFIED);
+ public static final ProtocolStatus STATUS_WOULDBLOCK = makeStatus(WOULDBLOCK);
+ public static final ProtocolStatus STATUS_BLOCKED = makeStatus(BLOCKED);
+
+ public static ProtocolStatus makeStatus(int code) {
+ ProtocolStatus pstatus = new ProtocolStatus();
+ pstatus.setCode(code);
+ pstatus.setLastModified(0);
+ return pstatus;
+ }
+
+ public static ProtocolStatus makeStatus(int code, String message) {
+ ProtocolStatus pstatus = makeStatus(code);
+ pstatus.addToArgs(new Utf8(message));
+ return pstatus;
+ }
+
+ public static ProtocolStatus makeStatus(int code, URL url) {
+ return makeStatus(code, url.toString());
+ }
+
+ public static String getMessage(ProtocolStatus pstatus) {
+ GenericArray<Utf8> args = pstatus.getArgs();
+ if (args == null || args.size() == 0) {
+ return null;
+ }
+ return TableUtil.toString(args.iterator().next());
+ }
+}
+
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/StorageUtils.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/StorageUtils.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/StorageUtils.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/StorageUtils.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,95 @@
+package org.apache.nutch.storage;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.mapreduce.GoraOutputFormat;
+import org.gora.mapreduce.GoraReducer;
+import org.gora.persistency.Persistent;
+import org.gora.query.Query;
+import org.gora.store.DataStore;
+import org.gora.store.DataStoreFactory;
+
+public class StorageUtils {
+
+ @SuppressWarnings("unchecked")
+ public static <K, V extends Persistent> DataStore<K, V> createDataStore(Configuration conf,
+ Class<K> keyClass, Class<V> persistentClass) throws ClassNotFoundException {
+ Class<? extends DataStore<K, V>> dataStoreClass =
+ (Class<? extends DataStore<K, V>>) getDataStoreClass(conf);
+ return DataStoreFactory.createDataStore(dataStoreClass,
+ keyClass, persistentClass);
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <K, V extends Persistent> Class<? extends DataStore<K, V>>
+ getDataStoreClass(Configuration conf) throws ClassNotFoundException {
+ return (Class<? extends DataStore<K, V>>)
+ Class.forName(conf.get("storage.data.store.class",
+ "org.gora.hbase.store.HBaseStore"));
+ }
+
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields,
+ Class<K> outKeyClass, Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass, boolean reuseObjects)
+ throws ClassNotFoundException, IOException {
+ initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, reuseObjects);
+ }
+
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields,
+ Class<K> outKeyClass, Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass)
+ throws ClassNotFoundException, IOException {
+ initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, null, true);
+ }
+
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields,
+ Class<K> outKeyClass, Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+ Class<? extends Partitioner<K, V>> partitionerClass)
+ throws ClassNotFoundException, IOException {
+ initMapperJob(job, fields, outKeyClass, outValueClass, mapperClass, partitionerClass, true);
+ }
+
+ public static <K, V> void initMapperJob(Job job,
+ Collection<WebPage.Field> fields,
+ Class<K> outKeyClass, Class<V> outValueClass,
+ Class<? extends GoraMapper<String, WebPage, K, V>> mapperClass,
+ Class<? extends Partitioner<K, V>> partitionerClass, boolean reuseObjects)
+ throws ClassNotFoundException, IOException {
+ DataStore<String, WebPage> store =
+ createDataStore(job.getConfiguration(), String.class, WebPage.class);
+ Query<String, WebPage> query = store.newQuery();
+ query.setFields(toStringArray(fields));
+ GoraMapper.initMapperJob(job, query, store,
+ outKeyClass, outValueClass, mapperClass, partitionerClass, reuseObjects);
+ GoraOutputFormat.setOutput(job, store, true);
+ }
+
+ public static <K, V> void initReducerJob(Job job,
+ Class<? extends GoraReducer<K, V, String, WebPage>> reducerClass)
+ throws ClassNotFoundException {
+ Configuration conf = job.getConfiguration();
+ DataStore<String, WebPage> store =
+ StorageUtils.createDataStore(conf, String.class, WebPage.class);
+ GoraReducer.initReducerJob(job, store, reducerClass);
+ GoraOutputFormat.setOutput(job, store, true);
+ }
+
+ private static String[] toStringArray(Collection<WebPage.Field> fields) {
+ String[] arr = new String[fields.size()];
+ Iterator<WebPage.Field> iter = fields.iterator();
+ for (int i = 0; i < arr.length; i++) {
+ arr[i] = iter.next().getName();
+ }
+ return arr;
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebPage.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebPage.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebPage.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebPage.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,336 @@
+package org.apache.nutch.storage;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.specific.SpecificExceptionBase;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificFixed;
+import org.gora.persistency.StateManager;
+import org.gora.persistency.impl.PersistentBase;
+import org.gora.persistency.impl.StateManagerImpl;
+import org.gora.persistency.StatefulHashMap;
+import org.gora.persistency.ListGenericArray;
+
+@SuppressWarnings("all")
+public class WebPage extends PersistentBase {
+ public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"baseUrl\",\"type\":\"string\"},{\"name\":\"status\",\"type\":\"int\"},{\"name\":\"fetchTime\",\"type\":\"long\"},{\"name\":\"prevFetchTime\",\"type\":\"long\"},{\"name\":\"fetchInterval\",\"type\":\"int\"},{\"name\":\"retriesSinceFetch\",\"type\":\"int\"},{\"name\":\"modifiedTime\",\"type\":\"long\"},{\"name\":\"protocolStatus\",\"type\":{\"type\":\"record\",\"name\":\"ProtocolStatus\",\"fields\":[{\"name\":\"code\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"lastModified\",\"type\":\"long\"}]}},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"contentType\",\"type\":\"string\"},{\"name\":\"prevSignature\",\"type\":\"bytes\"},{\"name\":\"signature\",\"type\":\"bytes\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name
\":\"parseStatus\",\"type\":{\"type\":\"record\",\"name\":\"ParseStatus\",\"fields\":[{\"name\":\"majorCode\",\"type\":\"int\"},{\"name\":\"minorCode\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}},{\"name\":\"score\",\"type\":\"float\"},{\"name\":\"reprUrl\",\"type\":\"string\"},{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"inlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"markers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"bytes\"}}]}");
+ public static enum Field {
+ BASE_URL(0,"baseUrl"),
+ STATUS(1,"status"),
+ FETCH_TIME(2,"fetchTime"),
+ PREV_FETCH_TIME(3,"prevFetchTime"),
+ FETCH_INTERVAL(4,"fetchInterval"),
+ RETRIES_SINCE_FETCH(5,"retriesSinceFetch"),
+ MODIFIED_TIME(6,"modifiedTime"),
+ PROTOCOL_STATUS(7,"protocolStatus"),
+ CONTENT(8,"content"),
+ CONTENT_TYPE(9,"contentType"),
+ PREV_SIGNATURE(10,"prevSignature"),
+ SIGNATURE(11,"signature"),
+ TITLE(12,"title"),
+ TEXT(13,"text"),
+ PARSE_STATUS(14,"parseStatus"),
+ SCORE(15,"score"),
+ REPR_URL(16,"reprUrl"),
+ HEADERS(17,"headers"),
+ OUTLINKS(18,"outlinks"),
+ INLINKS(19,"inlinks"),
+ MARKERS(20,"markers"),
+ METADATA(21,"metadata"),
+ ;
+ private int index;
+ private String name;
+ Field(int index, String name) {this.index=index;this.name=name;}
+ public int getIndex() {return index;}
+ public String getName() {return name;}
+ public String toString() {return name;}
+ };
+ public static final String[] _ALL_FIELDS = {"baseUrl","status","fetchTime","prevFetchTime","fetchInterval","retriesSinceFetch","modifiedTime","protocolStatus","content","contentType","prevSignature","signature","title","text","parseStatus","score","reprUrl","headers","outlinks","inlinks","markers","metadata",};
+ static {
+ PersistentBase.registerFields(WebPage.class, _ALL_FIELDS);
+ }
+ private Utf8 baseUrl;
+ private int status;
+ private long fetchTime;
+ private long prevFetchTime;
+ private int fetchInterval;
+ private int retriesSinceFetch;
+ private long modifiedTime;
+ private ProtocolStatus protocolStatus;
+ private ByteBuffer content;
+ private Utf8 contentType;
+ private ByteBuffer prevSignature;
+ private ByteBuffer signature;
+ private Utf8 title;
+ private Utf8 text;
+ private ParseStatus parseStatus;
+ private float score;
+ private Utf8 reprUrl;
+ private Map<Utf8,Utf8> headers;
+ private Map<Utf8,Utf8> outlinks;
+ private Map<Utf8,Utf8> inlinks;
+ private Map<Utf8,Utf8> markers;
+ private Map<Utf8,ByteBuffer> metadata;
+ public WebPage() {
+ this(new StateManagerImpl());
+ }
+ public WebPage(StateManager stateManager) {
+ super(stateManager);
+ headers = new StatefulHashMap<Utf8,Utf8>();
+ outlinks = new StatefulHashMap<Utf8,Utf8>();
+ inlinks = new StatefulHashMap<Utf8,Utf8>();
+ markers = new StatefulHashMap<Utf8,Utf8>();
+ metadata = new StatefulHashMap<Utf8,ByteBuffer>();
+ }
+ public WebPage newInstance(StateManager stateManager) {
+ return new WebPage(stateManager);
+ }
+ public Schema getSchema() { return _SCHEMA; }
+ public Object get(int _field) {
+ switch (_field) {
+ case 0: return baseUrl;
+ case 1: return status;
+ case 2: return fetchTime;
+ case 3: return prevFetchTime;
+ case 4: return fetchInterval;
+ case 5: return retriesSinceFetch;
+ case 6: return modifiedTime;
+ case 7: return protocolStatus;
+ case 8: return content;
+ case 9: return contentType;
+ case 10: return prevSignature;
+ case 11: return signature;
+ case 12: return title;
+ case 13: return text;
+ case 14: return parseStatus;
+ case 15: return score;
+ case 16: return reprUrl;
+ case 17: return headers;
+ case 18: return outlinks;
+ case 19: return inlinks;
+ case 20: return markers;
+ case 21: return metadata;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ @SuppressWarnings(value="unchecked")
+ public void put(int _field, Object _value) {
+ getStateManager().setDirty(this, _field);
+ switch (_field) {
+ case 0:baseUrl = (Utf8)_value; break;
+ case 1:status = (Integer)_value; break;
+ case 2:fetchTime = (Long)_value; break;
+ case 3:prevFetchTime = (Long)_value; break;
+ case 4:fetchInterval = (Integer)_value; break;
+ case 5:retriesSinceFetch = (Integer)_value; break;
+ case 6:modifiedTime = (Long)_value; break;
+ case 7:protocolStatus = (ProtocolStatus)_value; break;
+ case 8:content = (ByteBuffer)_value; break;
+ case 9:contentType = (Utf8)_value; break;
+ case 10:prevSignature = (ByteBuffer)_value; break;
+ case 11:signature = (ByteBuffer)_value; break;
+ case 12:title = (Utf8)_value; break;
+ case 13:text = (Utf8)_value; break;
+ case 14:parseStatus = (ParseStatus)_value; break;
+ case 15:score = (Float)_value; break;
+ case 16:reprUrl = (Utf8)_value; break;
+ case 17:headers = (Map<Utf8,Utf8>)_value; break;
+ case 18:outlinks = (Map<Utf8,Utf8>)_value; break;
+ case 19:inlinks = (Map<Utf8,Utf8>)_value; break;
+ case 20:markers = (Map<Utf8,Utf8>)_value; break;
+ case 21:metadata = (Map<Utf8,ByteBuffer>)_value; break;
+ default: throw new AvroRuntimeException("Bad index");
+ }
+ }
+ public Utf8 getBaseUrl() {
+ return (Utf8) get(0);
+ }
+ public void setBaseUrl(Utf8 value) {
+ put(0, value);
+ }
+ public int getStatus() {
+ return (Integer) get(1);
+ }
+ public void setStatus(int value) {
+ put(1, value);
+ }
+ public long getFetchTime() {
+ return (Long) get(2);
+ }
+ public void setFetchTime(long value) {
+ put(2, value);
+ }
+ public long getPrevFetchTime() {
+ return (Long) get(3);
+ }
+ public void setPrevFetchTime(long value) {
+ put(3, value);
+ }
+ public int getFetchInterval() {
+ return (Integer) get(4);
+ }
+ public void setFetchInterval(int value) {
+ put(4, value);
+ }
+ public int getRetriesSinceFetch() {
+ return (Integer) get(5);
+ }
+ public void setRetriesSinceFetch(int value) {
+ put(5, value);
+ }
+ public long getModifiedTime() {
+ return (Long) get(6);
+ }
+ public void setModifiedTime(long value) {
+ put(6, value);
+ }
+ public ProtocolStatus getProtocolStatus() {
+ return (ProtocolStatus) get(7);
+ }
+ public void setProtocolStatus(ProtocolStatus value) {
+ put(7, value);
+ }
+ public ByteBuffer getContent() {
+ return (ByteBuffer) get(8);
+ }
+ public void setContent(ByteBuffer value) {
+ put(8, value);
+ }
+ public Utf8 getContentType() {
+ return (Utf8) get(9);
+ }
+ public void setContentType(Utf8 value) {
+ put(9, value);
+ }
+ public ByteBuffer getPrevSignature() {
+ return (ByteBuffer) get(10);
+ }
+ public void setPrevSignature(ByteBuffer value) {
+ put(10, value);
+ }
+ public ByteBuffer getSignature() {
+ return (ByteBuffer) get(11);
+ }
+ public void setSignature(ByteBuffer value) {
+ put(11, value);
+ }
+ public Utf8 getTitle() {
+ return (Utf8) get(12);
+ }
+ public void setTitle(Utf8 value) {
+ put(12, value);
+ }
+ public Utf8 getText() {
+ return (Utf8) get(13);
+ }
+ public void setText(Utf8 value) {
+ put(13, value);
+ }
+ public ParseStatus getParseStatus() {
+ return (ParseStatus) get(14);
+ }
+ public void setParseStatus(ParseStatus value) {
+ put(14, value);
+ }
+ public float getScore() {
+ return (Float) get(15);
+ }
+ public void setScore(float value) {
+ put(15, value);
+ }
+ public Utf8 getReprUrl() {
+ return (Utf8) get(16);
+ }
+ public void setReprUrl(Utf8 value) {
+ put(16, value);
+ }
+ public Map<Utf8, Utf8> getHeaders() {
+ return (Map<Utf8, Utf8>) get(17);
+ }
+ public Utf8 getFromHeaders(Utf8 key) {
+ if (headers == null) { return null; }
+ return headers.get(key);
+ }
+ public void putToHeaders(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 17);
+ headers.put(key, value);
+ }
+ public Utf8 removeFromHeaders(Utf8 key) {
+ if (headers == null) { return null; }
+ getStateManager().setDirty(this, 17);
+ return headers.remove(key);
+ }
+ public Map<Utf8, Utf8> getOutlinks() {
+ return (Map<Utf8, Utf8>) get(18);
+ }
+ public Utf8 getFromOutlinks(Utf8 key) {
+ if (outlinks == null) { return null; }
+ return outlinks.get(key);
+ }
+ public void putToOutlinks(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 18);
+ outlinks.put(key, value);
+ }
+ public Utf8 removeFromOutlinks(Utf8 key) {
+ if (outlinks == null) { return null; }
+ getStateManager().setDirty(this, 18);
+ return outlinks.remove(key);
+ }
+ public Map<Utf8, Utf8> getInlinks() {
+ return (Map<Utf8, Utf8>) get(19);
+ }
+ public Utf8 getFromInlinks(Utf8 key) {
+ if (inlinks == null) { return null; }
+ return inlinks.get(key);
+ }
+ public void putToInlinks(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 19);
+ inlinks.put(key, value);
+ }
+ public Utf8 removeFromInlinks(Utf8 key) {
+ if (inlinks == null) { return null; }
+ getStateManager().setDirty(this, 19);
+ return inlinks.remove(key);
+ }
+ public Map<Utf8, Utf8> getMarkers() {
+ return (Map<Utf8, Utf8>) get(20);
+ }
+ public Utf8 getFromMarkers(Utf8 key) {
+ if (markers == null) { return null; }
+ return markers.get(key);
+ }
+ public void putToMarkers(Utf8 key, Utf8 value) {
+ getStateManager().setDirty(this, 20);
+ markers.put(key, value);
+ }
+ public Utf8 removeFromMarkers(Utf8 key) {
+ if (markers == null) { return null; }
+ getStateManager().setDirty(this, 20);
+ return markers.remove(key);
+ }
+ public Map<Utf8, ByteBuffer> getMetadata() {
+ return (Map<Utf8, ByteBuffer>) get(21);
+ }
+ public ByteBuffer getFromMetadata(Utf8 key) {
+ if (metadata == null) { return null; }
+ return metadata.get(key);
+ }
+ public void putToMetadata(Utf8 key, ByteBuffer value) {
+ getStateManager().setDirty(this, 21);
+ metadata.put(key, value);
+ }
+ public ByteBuffer removeFromMetadata(Utf8 key) {
+ if (metadata == null) { return null; }
+ getStateManager().setDirty(this, 21);
+ return metadata.remove(key);
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebTableCreator.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebTableCreator.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebTableCreator.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/storage/WebTableCreator.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,14 @@
+package org.apache.nutch.storage;
+
+import org.apache.nutch.util.NutchConfiguration;
+import org.gora.store.DataStore;
+
+public class WebTableCreator {
+ public static void main(String[] args) throws Exception {
+ DataStore<String, WebPage> store =
+ StorageUtils.createDataStore(NutchConfiguration.create(), String.class,
+ WebPage.class);
+
+ System.out.println(store);
+ }
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/EncodingDetector.java Wed Jun 30 10:36:20 2010
@@ -16,25 +16,19 @@
*/
package org.apache.nutch.util;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.metadata.Metadata;
import org.apache.nutch.net.protocols.Response;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.util.LogUtil;
-import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.storage.WebPage;
import com.ibm.icu.text.CharsetDetector;
import com.ibm.icu.text.CharsetMatch;
@@ -66,10 +60,12 @@ import com.ibm.icu.text.CharsetMatch;
*/
public class EncodingDetector {
+ public final static Utf8 CONTENT_TYPE_UTF8 = new Utf8(Response.CONTENT_TYPE);
+
private class EncodingClue {
- private String value;
- private String source;
- private int confidence;
+ private final String value;
+ private final String source;
+ private final int confidence;
// Constructor for clues with no confidence values (ignore thresholds)
public EncodingClue(String value, String source) {
@@ -92,6 +88,7 @@ public class EncodingDetector {
return value;
}
+ @Override
public String toString() {
return value + " (" + source +
((confidence >= 0) ? ", " + confidence + "% confidence" : "") + ")";
@@ -153,11 +150,11 @@ public class EncodingDetector {
}
- private int minConfidence;
+ private final int minConfidence;
- private CharsetDetector detector;
+ private final CharsetDetector detector;
- private List<EncodingClue> clues;
+ private final List<EncodingClue> clues;
public EncodingDetector(Configuration conf) {
minConfidence = conf.getInt(MIN_CONFIDENCE_KEY, -1);
@@ -165,19 +162,15 @@ public class EncodingDetector {
clues = new ArrayList<EncodingClue>();
}
- public void autoDetectClues(Content content, boolean filter) {
- autoDetectClues(content.getContent(), content.getContentType(),
- parseCharacterEncoding(content.getMetadata().get(Response.CONTENT_TYPE)),
- filter);
- }
-
- public void autoDetectClues(WebTableRow row, boolean filter) {
- autoDetectClues(row.getContent(), row.getContentType(),
- parseCharacterEncoding(row.getHeader(Response.CONTENT_TYPE)), filter);
+ public void autoDetectClues(WebPage page, boolean filter) {
+ autoDetectClues(page.getContent(), page.getContentType(),
+ parseCharacterEncoding(page.getFromHeaders(CONTENT_TYPE_UTF8)), filter);
}
-
- private void autoDetectClues(byte[] data, String type,
+
+ private void autoDetectClues(ByteBuffer dataBuffer, Utf8 typeUtf8,
String encoding, boolean filter) {
+ byte[] data = dataBuffer.array();
+ String type = TableUtil.toString(typeUtf8);
if (minConfidence >= 0 && DETECTABLES.contains(type)
&& data.length > MIN_LENGTH) {
@@ -220,21 +213,7 @@ public class EncodingDetector {
public void addClue(String value, String source) {
addClue(value, source, NO_THRESHOLD);
}
-
- /**
- * Guess the encoding with the previously specified list of clues.
- *
- * @param content Content instance
- * @param defaultValue Default encoding to return if no encoding can be
- * detected with enough confidence. Note that this will <b>not</b> be
- * normalized with {@link EncodingDetector#resolveEncodingAlias}
- *
- * @return Guessed encoding or defaultValue
- */
- public String guessEncoding(Content content, String defaultValue) {
- return guessEncoding(content.getBaseUrl(), defaultValue);
- }
-
+
/**
* Guess the encoding with the previously specified list of clues.
*
@@ -245,10 +224,12 @@ public class EncodingDetector {
*
* @return Guessed encoding or defaultValue
*/
- public String guessEncoding(WebTableRow row, String defaultValue) {
- return guessEncoding(row.getBaseUrl(), defaultValue);
+ public String guessEncoding(WebPage page, String defaultValue) {
+ Utf8 baseUrlUtf8 = page.getBaseUrl();
+ String baseUrl = TableUtil.toString(baseUrlUtf8);
+ return guessEncoding(baseUrl, defaultValue);
}
-
+
/**
* Guess the encoding with the previously specified list of clues.
*
@@ -364,9 +345,10 @@ public class EncodingDetector {
*
* @param contentType a content type header
*/
- public static String parseCharacterEncoding(String contentType) {
- if (contentType == null)
+ public static String parseCharacterEncoding(Utf8 contentTypeUtf8) {
+ if (contentTypeUtf8 == null)
return (null);
+ String contentType = contentTypeUtf8.toString();
int start = contentType.indexOf("charset=");
if (start < 0)
return (null);
@@ -382,7 +364,7 @@ public class EncodingDetector {
}
- public static void main(String[] args) throws IOException {
+ /*public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.err.println("Usage: EncodingDetector <file>");
System.exit(1);
@@ -421,6 +403,6 @@ public class EncodingDetector {
String encoding = detector.guessEncoding(content,
conf.get("parser.character.encoding.default"));
System.out.println("Guessed encoding: " + encoding);
- }
+ }*/
}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/util/IdentityPageReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/IdentityPageReducer.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/IdentityPageReducer.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/IdentityPageReducer.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,19 @@
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.nutch.storage.WebPage;
+import org.gora.mapreduce.GoraReducer;
+
+public class IdentityPageReducer
+extends GoraReducer<String, WebPage, String, WebPage> {
+
+ @Override
+ protected void reduce(String key, Iterable<WebPage> values,
+ Context context) throws IOException, InterruptedException {
+ for (WebPage page : values) {
+ context.write(key, page);
+ }
+ }
+
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/util/MimeUtil.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/MimeUtil.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/MimeUtil.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/MimeUtil.java Wed Jun 30 10:36:20 2010
@@ -19,6 +19,7 @@ package org.apache.nutch.util;
// JDK imports
import java.io.File;
+import java.io.IOException;
import java.util.logging.Logger;
// Hadoop imports
@@ -59,8 +60,13 @@ public final class MimeUtil {
MimeTypes mimeTypez = (MimeTypes) objectCache.getObject(MimeTypes.class
.getName());
if (mimeTypez == null) {
- mimeTypez = MimeTypesFactory.create(conf
- .getConfResourceAsInputStream(conf.get("mime.types.file")));
+ try {
+ mimeTypez = MimeTypesFactory.create(conf
+ .getConfResourceAsInputStream(conf.get("mime.types.file")));
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
objectCache.setObject(MimeTypes.class.getName(), mimeTypez);
}
@@ -139,7 +145,7 @@ public final class MimeUtil {
// if returned null, or if it's the default type then try url resolution
if (type == null
- || (type != null && type.getName().equals(MimeTypes.DEFAULT))) {
+ || (type != null && type.getName().equals(MimeTypes.OCTET_STREAM))) {
// If no mime-type header, or cannot find a corresponding registered
// mime-type, then guess a mime-type from the url pattern
type = this.mimeTypes.getMimeType(url) != null ? this.mimeTypes
@@ -152,7 +158,8 @@ public final class MimeUtil {
// returned by the magic
if (this.mimeMagic) {
MimeType magicType = this.mimeTypes.getMimeType(data);
- if (magicType != null && !magicType.getName().equals(MimeTypes.DEFAULT)
+ if (magicType != null && !magicType.getName().equals(MimeTypes.OCTET_STREAM)
+ && !magicType.getName().equals(MimeTypes.PLAIN_TEXT)
&& type != null && !type.getName().equals(magicType.getName())) {
// If magic enabled and the current mime type differs from that of the
// one returned from the magic, take the magic mimeType
@@ -163,7 +170,7 @@ public final class MimeUtil {
// default type
if (type == null) {
try {
- type = this.mimeTypes.forName(MimeTypes.DEFAULT);
+ type = this.mimeTypes.forName(MimeTypes.OCTET_STREAM);
} catch (Exception ignore) {
}
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/NutchJob.java Wed Jun 30 10:36:20 2010
@@ -19,8 +19,10 @@ package org.apache.nutch.util;
import java.io.IOException;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.metadata.Nutch;
/** A {@link Job} for Nutch jobs. */
public class NutchJob extends Job {
@@ -29,9 +31,20 @@ public class NutchJob extends Job {
super(conf);
setJarByClass(this.getClass());
}
-
+
public NutchJob(Configuration conf, String jobName) throws IOException {
super(conf, jobName);
setJarByClass(this.getClass());
}
+
+ public static boolean shouldProcess(Utf8 mark, Utf8 crawlId) {
+ if (mark == null) {
+ return false;
+ }
+ boolean isAll = crawlId.equals(Nutch.ALL_CRAWL_ID);
+ if (!isAll && !mark.equals(crawlId)) {
+ return false;
+ }
+ return true;
+ }
}
\ No newline at end of file
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/util/Pair.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/Pair.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/Pair.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/Pair.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,21 @@
+package org.apache.nutch.util;
+
+public class Pair<F, S> {
+
+ private F first;
+ private S second;
+
+ public Pair(F first, S second) {
+ super();
+ this.first = first;
+ this.second = second;
+ }
+
+ public F getFirst() {
+ return first;
+ }
+
+ public S getSecond() {
+ return second;
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/util/TableUtil.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/TableUtil.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/TableUtil.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/TableUtil.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,124 @@
+package org.apache.nutch.util;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.util.Utf8;
+
+public class TableUtil {
+
+ public static final ByteBuffer YES_VAL = ByteBuffer.wrap(new byte[] { 'y' });
+
+ /**
+ * Reverses a url's domain. This form is better for storing in hbase. Because
+ * scans within the same domain are faster.
+ * <p>
+ * E.g. "http://bar.foo.com:8983/to/index.html?a=b" becomes
+ * "com.foo.bar:8983:http/to/index.html?a=b".
+ *
+ * @param url
+ * url to be reversed
+ * @return Reversed url
+ * @throws MalformedURLException
+ */
+ public static String reverseUrl(String urlString)
+ throws MalformedURLException {
+ return reverseUrl(new URL(urlString));
+ }
+
+ /**
+ * Reverses a url's domain. This form is better for storing in hbase. Because
+ * scans within the same domain are faster.
+ * <p>
+ * E.g. "http://bar.foo.com:8983/to/index.html?a=b" becomes
+ * "com.foo.bar:http:8983/to/index.html?a=b".
+ *
+ * @param url
+ * url to be reversed
+ * @return Reversed url
+ */
+ public static String reverseUrl(URL url) {
+ String host = url.getHost();
+ String file = url.getFile();
+ String protocol = url.getProtocol();
+ int port = url.getPort();
+
+ StringBuilder buf = new StringBuilder();
+
+ /* reverse host */
+ reverseAppendSplits(host.split("\\."), buf);
+
+ /* add protocol */
+ buf.append(':');
+ buf.append(protocol);
+
+ /* add port if necessary */
+ if (port != -1) {
+ buf.append(':');
+ buf.append(port);
+ }
+
+ /* add path */
+ if (file.length() > 0 && '/' != file.charAt(0)) {
+ buf.append('/');
+ }
+ buf.append(file);
+
+ return buf.toString();
+ }
+
+ public static String unreverseUrl(String reversedUrl) {
+ StringBuilder buf = new StringBuilder(reversedUrl.length() + 2);
+
+ int pathBegin = reversedUrl.indexOf('/');
+ if (pathBegin == -1)
+ pathBegin = reversedUrl.length();
+ String sub = reversedUrl.substring(0, pathBegin);
+
+ String[] splits = sub.split(":"); // {<reversed host>, <port>, <protocol>}
+
+ buf.append(splits[1]); // add protocol
+ buf.append("://");
+ reverseAppendSplits(splits[0].split("\\."), buf); // splits[0] is reversed
+ // host
+ if (splits.length == 3) { // has a port
+ buf.append(':');
+ buf.append(splits[2]);
+ }
+ buf.append(reversedUrl.substring(pathBegin));
+ return buf.toString();
+ }
+
+ /**
+ * Given a reversed url, returns the reversed host E.g
+ * "com.foo.bar:http:8983/to/index.html?a=b" -> "com.foo.bar"
+ *
+ * @param reversedUrl
+ * Reversed url
+ * @return Reversed host
+ */
+ public static String getReversedHost(String reversedUrl) {
+ return reversedUrl.substring(0, reversedUrl.indexOf(':'));
+ }
+
+ private static void reverseAppendSplits(String[] splits, StringBuilder buf) {
+ for (int i = splits.length - 1; i > 0; i--) {
+ buf.append(splits[i]);
+ buf.append('.');
+ }
+ buf.append(splits[0]);
+ }
+
+ /**
+ * Convert given Utf8 instance to String
+ *
+ * @param utf8
+ * Utf8 object
+ * @return string-ifed Utf8 object or null if Utf8 instance is null
+ */
+ public static String toString(Utf8 utf8) {
+ return (utf8 == null ? null : utf8.toString());
+ }
+
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/util/WebPageWritable.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/util/WebPageWritable.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/util/WebPageWritable.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/util/WebPageWritable.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,41 @@
+package org.apache.nutch.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.storage.WebPage;
+import org.gora.util.IOUtils;
+
+public class WebPageWritable extends Configured
+implements Writable {
+
+ private WebPage webPage;
+
+ public WebPageWritable() {
+ this(null, new WebPage());
+ }
+
+ public WebPageWritable(Configuration conf, WebPage webPage) {
+ super(conf);
+ this.webPage = webPage;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ webPage = IOUtils.deserialize(getConf(), in, webPage, WebPage.class);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ IOUtils.serialize(getConf(), out, webPage, WebPage.class);
+ }
+
+ public WebPage getWebPage() {
+ return webPage;
+ }
+
+}
Modified: nutch/branches/nutchbase/src/java/overview.html
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/overview.html?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/overview.html (original)
+++ nutch/branches/nutchbase/src/java/overview.html Wed Jun 30 10:36:20 2010
@@ -3,7 +3,7 @@
<title>Nutch</title>
</head>
<body>
-Nutch is the open-source search engine.<p>
+Nutch is an open-source web crawler.<p>
</body>
</html>
Modified: nutch/branches/nutchbase/src/plugin/build-plugin.xml
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/plugin/build-plugin.xml?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/plugin/build-plugin.xml (original)
+++ nutch/branches/nutchbase/src/plugin/build-plugin.xml Wed Jun 30 10:36:20 2010
@@ -16,7 +16,7 @@
limitations under the License.
-->
<!-- Imported by plugin build.xml files to define default targets. -->
-<project>
+<project xmlns:ivy="antlib:org.apache.ivy.ant">
<property name="name" value="${ant.project.name}"/>
<property name="root" value="${basedir}"/>
@@ -52,10 +52,13 @@
<pathelement location="${build.classes}"/>
<fileset refid="lib.jars"/>
<pathelement location="${nutch.root}/build/classes"/>
- <fileset dir="${nutch.root}/lib">
+ <fileset dir="${nutch.root}/build/lib">
<include name="*.jar" />
</fileset>
<path refid="plugin.deps"/>
+ <fileset dir="${deploy.dir}">
+ <include name="*.jar" />
+ </fileset>
</path>
<!-- the unit test classpath -->
@@ -75,6 +78,7 @@
<mkdir dir="${build.dir}"/>
<mkdir dir="${build.classes}"/>
<mkdir dir="${build.test}"/>
+ <mkdir dir="${deploy.dir}"/>
<antcall target="init-plugin"/>
</target>
@@ -97,7 +101,7 @@
<!-- ====================================================== -->
<!-- Compile the Java files -->
<!-- ====================================================== -->
- <target name="compile" depends="init,deps-jar">
+ <target name="compile" depends="init,deps-jar, resolve-default">
<echo message="Compiling plugin: ${name}"/>
<javac
encoding="${build.encoding}"
@@ -186,6 +190,7 @@
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.data" value="${build.test}/data"/>
<sysproperty key="test.input" value="${root}/data"/>
+ <sysproperty key="javax.xml.parsers.DocumentBuilderFactory" value="com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl"/>
<classpath refid="test.classpath"/>
<formatter type="plain" />
<batchtest todir="${build.test}" unless="testcase">
@@ -201,12 +206,32 @@
</target>
+ <!-- target: resolve ================================================= -->
+ <target name="resolve-default" depends="clean-lib" description="resolve and retrieve dependencies with ivy">
+ <ivy:resolve file="ivy.xml" conf="default" log="download-only"/>
+ <ivy:retrieve pattern="${deploy.dir}/[artifact]-[revision].[ext]" symlink="false" log="quiet"/>
+ </target>
+
+ <target name="resolve-test" depends="clean-lib" description="resolve and retrieve dependencies with ivy">
+ <ivy:resolve file="ivy.xml" conf="test" log="download-only"/>
+ <ivy:retrieve pattern="${deploy.dir}/[artifact]-[revision].[ext]" symlink="false" log="quiet"/>
+ </target>
+
<!-- ================================================================== -->
<!-- Clean. Delete the build files, and their directories -->
<!-- ================================================================== -->
- <target name="clean">
- <delete dir="${build.dir}"/>
- <delete dir="${deploy.dir}"/>
+ <!-- target: clean =================================================== -->
+ <target name="clean" depends="clean-build, clean-lib" description="--> clean the project" />
+
+ <!-- target: clean-lib =============================================== -->
+ <target name="clean-lib" description="--> clean the project libraries directory (dependencies)">
+ <delete includeemptydirs="true" dir="${build.lib.dir}"/>
+ </target>
+
+ <!-- target: clean-build ============================================= -->
+ <target name="clean-build" description="--> clean the project built files">
+ <delete includeemptydirs="true" dir="${build.dir}"/>
+ <delete includeemptydirs="true" dir="${deploy.dir}"/>
</target>
</project>