You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2010/11/25 13:05:58 UTC
svn commit: r1039014 [1/2] - in /nutch/trunk: ./
src/java/org/apache/nutch/api/ src/java/org/apache/nutch/api/impl/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer...
Author: ab
Date: Thu Nov 25 12:05:57 2010
New Revision: 1039014
URL: http://svn.apache.org/viewvc?rev=1039014&view=rev
Log:
NUTCH-932 Bulk REST API to retrieve crawl results as JSON.
Added:
nutch/trunk/src/java/org/apache/nutch/api/DbReader.java (with props)
nutch/trunk/src/java/org/apache/nutch/api/DbResource.java (with props)
nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java (with props)
nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java (with props)
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java
nutch/trunk/src/java/org/apache/nutch/api/JobManager.java
nutch/trunk/src/java/org/apache/nutch/api/JobResource.java
nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java
nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java
nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java
nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java
nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java
nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java
nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Nov 25 12:05:57 2010
@@ -88,6 +88,9 @@ Release 2.0 - Current Development
* NUTCH-931 Simple admin API to fetch status and stop the service (ab)
+* NUTCH-932 Bulk REST API to retrieve crawl results as JSON (ab)
+
+
Release 1.1 - 2010-06-06
* NUTCH-819 Included Solr schema.xml and solrindex-mapping.xml don't play together (ab)
Modified: nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java Thu Nov 25 12:05:57 2010
@@ -13,6 +13,7 @@ public class APIInfoResource extends Ser
static {
info.add(new String[]{AdminResource.PATH, AdminResource.DESCR});
info.add(new String[]{ConfResource.PATH, ConfResource.DESCR});
+ info.add(new String[]{DbResource.PATH, DbResource.DESCR});
info.add(new String[]{JobResource.PATH, JobResource.DESCR});
}
Added: nutch/trunk/src/java/org/apache/nutch/api/DbReader.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/DbReader.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/DbReader.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/api/DbReader.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,220 @@
+package org.apache.nutch.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParseStatusUtils;
+import org.apache.nutch.protocol.ProtocolStatusUtils;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.ParseStatus;
+import org.apache.nutch.storage.ProtocolStatus;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.Bytes;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbReader {
+ private static final Logger LOG = LoggerFactory.getLogger(DbReader.class);
+
+ DataStore<String,WebPage> store;
+ Configuration conf;
+
+ public DbReader(Configuration conf, String crawlId) {
+ conf = new Configuration(conf);
+ if (crawlId != null) {
+ conf.set(Nutch.CRAWL_ID_KEY, crawlId);
+ }
+ try {
+ store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ store = null;
+ }
+ }
+
+ public Iterator<Map<String,Object>> iterator(String[] fields, String startKey, String endKey,
+ String batchId) throws Exception {
+ Query<String,WebPage> q = store.newQuery();
+ String[] qFields = fields;
+ if (fields != null) {
+ HashSet<String> flds = new HashSet<String>(Arrays.asList(fields));
+ // remove "url"
+ flds.remove("url");
+ if (flds.size() > 0) {
+ qFields = (String[])flds.toArray(new String[flds.size()]);
+ } else {
+ qFields = null;
+ }
+ }
+ q.setFields(qFields);
+ if (startKey != null) {
+ q.setStartKey(startKey);
+ if (endKey != null) {
+ q.setEndKey(endKey);
+ }
+ }
+ Result<String,WebPage> res = store.execute(q);
+ // XXX we should add the filtering capability to Query
+ return new DbIterator(res, fields, batchId);
+ }
+
+ public void close() throws IOException {
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ private class DbIterator implements Iterator<Map<String,Object>> {
+ private Result<String,WebPage> res;
+ private boolean hasNext;
+ private String url;
+ private WebPage page;
+ private Utf8 batchId;
+ private TreeSet<String> fields;
+
+ DbIterator(Result<String,WebPage> res, String[] fields, String batchId) throws IOException {
+ this.res = res;
+ if (batchId != null) {
+ this.batchId = new Utf8(batchId);
+ }
+ if (fields != null) {
+ this.fields = new TreeSet<String>(Arrays.asList(fields));
+ }
+ advance();
+ }
+
+ private void advance() throws IOException {
+ hasNext = res.next();
+ if (hasNext && batchId != null) {
+ do {
+ WebPage page = res.get();
+ Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
+ if (NutchJob.shouldProcess(mark, batchId)) {
+ return;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " +
+ TableUtil.unreverseUrl(res.getKey()) + "; different batch id");
+ }
+ hasNext = res.next();
+ }
+ } while (hasNext);
+ }
+ }
+
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ public Map<String,Object> next() {
+ url = res.getKey();
+ page = (WebPage)res.get().clone();
+ try {
+ advance();
+ if (!hasNext) {
+ res.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ hasNext = false;
+ return null;
+ }
+ return pageAsMap(url, page);
+ }
+
+ private Map<String,Object> pageAsMap(String url, WebPage page) {
+ HashMap<String,Object> res = new HashMap<String,Object>();
+ if (fields == null || fields.contains("url")) {
+ res.put("url", TableUtil.unreverseUrl(url));
+ }
+ String[] pfields = page.getFields();
+ TreeSet<String> flds = null;
+ if (fields != null) {
+ flds = (TreeSet<String>)fields.clone();
+ } else {
+ flds = new TreeSet<String>(Arrays.asList(pfields));
+ }
+ flds.retainAll(Arrays.asList(pfields));
+ for (String f : flds) {
+ int idx = page.getFieldIndex(f);
+ if (idx < 0) {
+ continue;
+ }
+ Object val = page.get(idx);
+ if (val == null) {
+ continue;
+ }
+ if ("metadata".equals(f)) {
+ Map<Utf8, ByteBuffer> metadata = page.getMetadata();
+ Map<String,String> simpleMeta = new HashMap<String,String>();
+ if (metadata != null) {
+ Iterator<Entry<Utf8, ByteBuffer>> iterator = metadata.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Entry<Utf8, ByteBuffer> entry = iterator.next();
+ simpleMeta.put(entry.getKey().toString(),
+ Bytes.toStringBinary(entry.getValue().array()));
+ }
+ }
+ res.put(f, simpleMeta);
+ } else if ("protocolStatus".equals(f)) {
+ ProtocolStatus ps = page.getProtocolStatus();
+ res.put(f, ProtocolStatusUtils.toString(ps));
+ } else if ("parseStatus".equals(f)) {
+ ParseStatus ps = page.getParseStatus();
+ res.put(f, ParseStatusUtils.toString(ps));
+ } else if ("signature".equals(f)) {
+ ByteBuffer bb = page.getSignature();
+ res.put(f, StringUtil.toHexString(bb.array()));
+ } else if ("content".equals(f)) {
+ ByteBuffer bb = page.getContent();
+ res.put(f, Bytes.toStringBinary(bb.array()));
+ } else if ("markers".equals(f)) {
+ res.put(f, convertMap(page.getMarkers()));
+ } else if ("inlinks".equals(f)) {
+ res.put(f, convertMap(page.getInlinks()));
+ } else if ("outlinks".equals(f)) {
+ res.put(f, convertMap(page.getOutlinks()));
+ } else {
+ if (val instanceof Utf8) {
+ val = val.toString();
+ } else if (val instanceof ByteBuffer) {
+ val = Bytes.toStringBinary(((ByteBuffer)val).array());
+ }
+ res.put(f, val);
+ }
+ }
+ return res;
+ }
+
+ private Map<String,String> convertMap(Map map) {
+ Map<String,String> res = new HashMap<String,String>();
+ for (Object o : map.entrySet()) {
+ Entry e = (Entry)o;
+ res.put(e.getKey().toString(), e.getValue().toString());
+ }
+ return res;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
Propchange: nutch/trunk/src/java/org/apache/nutch/api/DbReader.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: nutch/trunk/src/java/org/apache/nutch/api/DbResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/DbResource.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/DbResource.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/api/DbResource.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,126 @@
+package org.apache.nutch.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.nutch.util.TableUtil;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.ext.jackson.JacksonConverter;
+import org.restlet.representation.OutputRepresentation;
+import org.restlet.representation.Representation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.ServerResource;
+
+public class DbResource extends ServerResource {
+ public static final String PATH = "db";
+ public static final String DESCR = "DB data streaming";
+
+ static JacksonConverter cnv = new JacksonConverter();
+ WeakHashMap<String,DbReader> readers = new WeakHashMap<String,DbReader>();
+
+ @Override
+ protected void doInit() throws ResourceException {
+ super.doInit();
+ getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ protected Representation get(final Variant variant) throws ResourceException {
+ String startKey = null;
+ String endKey = null;
+ String rStartKey = null; // reversed keys
+ String rEndKey = null;
+ String[] fields = null;
+ String batchId = null;
+ String confId = ConfResource.DEFAULT_CONF;
+ Form form = getQuery();
+ if (form != null) {
+ startKey = form.getFirstValue("start");
+ endKey = form.getFirstValue("end");
+ rStartKey = form.getFirstValue("rstart");
+ rEndKey = form.getFirstValue("rend");
+ if (rStartKey != null || rEndKey != null) {
+ startKey = rStartKey;
+ endKey = rEndKey;
+ } else {
+ if (startKey != null) {
+ try {
+ startKey = TableUtil.reverseUrl(startKey);
+ } catch (MalformedURLException e) { /*ignore */};
+ }
+ if (endKey != null) {
+ try {
+ endKey = TableUtil.reverseUrl(endKey);
+ } catch (MalformedURLException e) { /*ignore */};
+ }
+ }
+ batchId = form.getFirstValue("batch");
+ String flds = form.getFirstValue("fields");
+ if (flds != null && flds.trim().length() > 0) {
+ flds = flds.replaceAll("\\s+", "");
+ fields = flds.split(",");
+ }
+ }
+ DbReader reader;
+ synchronized (readers) {
+ reader = readers.get(confId);
+ if (reader == null) {
+ reader = new DbReader(NutchApp.confMgr.get(confId), null);
+ readers.put(confId, reader);
+ }
+ }
+ Representation res = new DbRepresentation(this, variant, reader, fields,
+ startKey, endKey, batchId);
+ return res;
+ }
+
+ private static class DbRepresentation extends OutputRepresentation {
+ private DbReader r;
+ private Variant variant;
+ private String[] fields;
+ private String startKey, endKey, batchId;
+ private DbResource resource;
+
+ public DbRepresentation(DbResource resource, Variant variant, DbReader reader,
+ String[] fields, String startKey, String endKey, String batchId) {
+ super(variant.getMediaType());
+ this.resource = resource;
+ this.r = reader;
+ this.variant = variant;
+ this.fields = fields;
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.batchId = batchId;
+ }
+
+ @Override
+ public void write(OutputStream out) throws IOException {
+ try {
+ out.write('[');
+ Iterator<Map<String,Object>> it = r.iterator(fields, startKey, endKey, batchId);
+ boolean first = true;
+ while (it.hasNext()) {
+ if (!first) {
+ out.write(',');
+ } else {
+ first = false;
+ }
+ Map<String,Object> item = it.next();
+ Representation repr = cnv.toRepresentation(item, variant, resource);
+ repr.write(out);
+ out.flush();
+ repr.release();
+ }
+ out.write(']');
+ } catch (Exception e) {
+ throw new IOException("DbReader.iterator failed", e);
+ }
+ }
+ }
+}
Propchange: nutch/trunk/src/java/org/apache/nutch/api/DbResource.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: nutch/trunk/src/java/org/apache/nutch/api/JobManager.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobManager.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobManager.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobManager.java Thu Nov 25 12:05:57 2010
@@ -12,9 +12,10 @@ public interface JobManager {
public List<JobStatus> list(String crawlId, State state) throws Exception;
- public Map<String,String> get(String crawlId, String id) throws Exception;
+ public JobStatus get(String crawlId, String id) throws Exception;
- public String create(String crawlId, JobType type, String confId, Object... args) throws Exception;
+ public String create(String crawlId, JobType type, String confId,
+ Map<String,Object> args) throws Exception;
public boolean abort(String crawlId, String id) throws Exception;
Modified: nutch/trunk/src/java/org/apache/nutch/api/JobResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobResource.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobResource.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobResource.java Thu Nov 25 12:05:57 2010
@@ -1,9 +1,11 @@
package org.apache.nutch.api;
+import java.util.List;
import java.util.Map;
import org.apache.nutch.api.JobManager.JobType;
import org.apache.nutch.api.JobStatus.State;
+import org.restlet.data.Form;
import org.restlet.resource.Get;
import org.restlet.resource.Put;
import org.restlet.resource.ServerResource;
@@ -14,18 +16,30 @@ public class JobResource extends ServerR
@Get("json")
public Object retrieve() throws Exception {
- String cid = (String)getRequestAttributes().get(Params.CRAWL_ID);
- String jid = (String)getRequestAttributes().get(Params.JOB_ID);
+ String cid = null;
+ String jid = null;
+ String cmd = null;
+ Form form = getQuery();
+ cid = (String)getRequestAttributes().get(Params.CRAWL_ID);
+ jid = (String)getRequestAttributes().get(Params.JOB_ID);
+ cmd = (String)getRequestAttributes().get(Params.CMD);
+ if (form != null) {
+ String v = form.getFirstValue(Params.CRAWL_ID);
+ if (v != null) cid = v;
+ v = form.getFirstValue(Params.JOB_ID);
+ if (v != null) jid = v;
+ v = form.getFirstValue(Params.CMD);
+ if (v != null) cmd = v;
+ }
if (jid == null) {
return NutchApp.jobMgr.list(cid, State.ANY);
} else {
// handle stop / abort / get
- String cmd = (String)getRequestAttributes().get(Params.CMD);
if (cmd == null) {
return NutchApp.jobMgr.get(cid, jid);
}
if (cmd.equals(Params.JOB_CMD_STOP)) {
- return NutchApp.jobMgr.abort(cid, jid);
+ return NutchApp.jobMgr.stop(cid, jid);
} else if (cmd.equals(Params.JOB_CMD_ABORT)) {
return NutchApp.jobMgr.abort(cid, jid);
} else if (cmd.equals(Params.JOB_CMD_GET)) {
@@ -46,9 +60,9 @@ public class JobResource extends ServerR
public Object create(Map<String,Object> args) throws Exception {
String cid = (String)args.get(Params.CRAWL_ID);
String typeString = (String)args.get(Params.JOB_TYPE);
- JobType type = JobType.valueOf(typeString);
+ JobType type = JobType.valueOf(typeString.toUpperCase());
String confId = (String)args.get(Params.CONF_ID);
- Object[] cmdArgs = (Object[])args.get(Params.ARGS);
+ Map<String,Object> cmdArgs = (Map<String,Object>)args.get(Params.ARGS);
String jobId = NutchApp.jobMgr.create(cid, type, confId, cmdArgs);
return jobId;
}
Modified: nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java Thu Nov 25 12:05:57 2010
@@ -3,18 +3,21 @@ package org.apache.nutch.api;
import java.util.Map;
import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.util.NutchTool;
public class JobStatus {
- public static enum State {IDLE, RUNNING, FINISHED, FAILED, KILLED, ANY};
+ public static enum State {IDLE, RUNNING, FINISHED, FAILED, KILLED,
+ STOPPING, KILLING, ANY};
public String id;
public JobType type;
public String confId;
- public Object[] args;
+ public Map<String,Object> args;
public Map<String,Object> result;
+ public NutchTool tool;
public State state;
public String msg;
- public JobStatus(String id, JobType type, String confId, Object[] args,
+ public JobStatus(String id, JobType type, String confId, Map<String,Object> args,
State state, String msg) {
this.id = id;
this.type = type;
Modified: nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java Thu Nov 25 12:05:57 2010
@@ -36,6 +36,8 @@ public class NutchApp extends Applicatio
"}", ConfResource.class);
router.attach("/" + ConfResource.PATH + "/{" + Params.CONF_ID +
"}/{" + Params.PROP_NAME + "}", ConfResource.class);
+ // db
+ router.attach("/" + DbResource.PATH, DbResource.class);
// jobs
router.attach("/" + JobResource.PATH, JobResource.class);
router.attach("/" + JobResource.PATH + "/{" + Params.JOB_ID + "}",
Modified: nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java Thu Nov 25 12:05:57 2010
@@ -15,10 +15,12 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.api.ConfResource;
import org.apache.nutch.api.JobManager;
import org.apache.nutch.api.JobStatus;
import org.apache.nutch.api.JobStatus.State;
import org.apache.nutch.api.NutchApp;
+import org.apache.nutch.crawl.Crawler;
import org.apache.nutch.crawl.DbUpdaterJob;
import org.apache.nutch.crawl.GeneratorJob;
import org.apache.nutch.crawl.InjectorJob;
@@ -79,6 +81,7 @@ public class RAMJobManager implements Jo
typeToClass.put(JobType.PARSE, ParserJob.class);
typeToClass.put(JobType.UPDATEDB, DbUpdaterJob.class);
typeToClass.put(JobType.READDB, WebTableReader.class);
+ typeToClass.put(JobType.CRAWL, Crawler.class);
}
private void addFinishedStatus(JobStatus status) {
@@ -109,13 +112,24 @@ public class RAMJobManager implements Jo
}
@Override
- public Map<String, String> get(String crawlId, String jobId) throws Exception {
- // TODO Auto-generated method stub
+ public JobStatus get(String crawlId, String jobId) throws Exception {
+ for (JobStatus job : jobRunning) {
+ if (job.id.equals(jobId)) {
+ return job;
+ }
+ }
+ for (JobStatus job : jobHistory) {
+ if (job.id.equals(jobId)) {
+ return job;
+ }
+ }
return null;
}
@Override
- public String create(String crawlId, JobType type, String confId, Object... args) throws Exception {
+ public String create(String crawlId, JobType type, String confId,
+ Map<String,Object> args) throws Exception {
+ if (args == null) args = Collections.emptyMap();
JobWorker worker = new JobWorker(crawlId, type, confId, args);
String id = worker.getId();
exec.execute(worker);
@@ -125,12 +139,28 @@ public class RAMJobManager implements Jo
@Override
public boolean abort(String crawlId, String id) throws Exception {
+ // find running job
+ for (JobStatus job : jobRunning) {
+ if (job.id.equals(id)) {
+ job.state = State.KILLING;
+ boolean res = job.tool.killJob();
+ job.state = State.KILLED;
+ return res;
+ }
+ }
return false;
}
@Override
public boolean stop(String crawlId, String id) throws Exception {
- // TODO Auto-generated method stub
+ // find running job
+ for (JobStatus job : jobRunning) {
+ if (job.id.equals(id)) {
+ job.state = State.STOPPING;
+ boolean res = job.tool.stopJob();
+ return res;
+ }
+ }
return false;
}
@@ -139,12 +169,13 @@ public class RAMJobManager implements Jo
JobType type;
String confId;
NutchTool tool;
- Object[] args;
- float progress = 0f;
- Job currentJob = null;
+ Map<String,Object> args;
JobStatus jobStatus;
- JobWorker(String crawlId, JobType type, String confId, Object... args) throws Exception {
+ JobWorker(String crawlId, JobType type, String confId, Map<String,Object> args) throws Exception {
+ if (confId == null) {
+ confId = ConfResource.DEFAULT_CONF;
+ }
Configuration conf = NutchApp.confMgr.get(confId);
// clone it - we are going to modify it
if (conf == null) {
@@ -161,10 +192,11 @@ public class RAMJobManager implements Jo
}
Class<? extends NutchTool> clz = typeToClass.get(type);
if (clz == null) {
- clz = (Class<? extends NutchTool>)args[0];
+ clz = (Class<? extends NutchTool>)Class.forName((String)args.get(Nutch.ARG_CLASS));
}
tool = ReflectionUtils.newInstance(clz, conf);
jobStatus = new JobStatus(id, type, confId, args, State.IDLE, "idle");
+ jobStatus.tool = tool;
}
public String getId() {
@@ -172,65 +204,31 @@ public class RAMJobManager implements Jo
}
public float getProgress() {
- return progress;
+ return tool.getProgress();
}
public State getState() {
return jobStatus.state;
}
- private final float[] noProgress = new float[2];
-
- public float[] getCurrentJobProgress() throws IOException {
- if (currentJob == null) {
- return noProgress;
- }
- float[] res = new float[2];
- res[0] = currentJob.mapProgress();
- res[1] = currentJob.reduceProgress();
- return res;
- }
-
public Map<String,Object> getResult() {
return jobStatus.result;
}
- public String getStatus() {
- return jobStatus.msg;
+ public Map<String,Object> getStatus() {
+ return tool.getStatus();
}
@Override
public void run() {
try {
- progress = 0f;
jobStatus.state = State.RUNNING;
- jobStatus.msg = "prepare";
- tool.prepare();
- progress = 0.1f;
- Job[] jobs = tool.createJobs(args);
- float delta = 0.8f / jobs.length;
- for (int i = 0; i < jobs.length; i++) {
- currentJob = jobs[i];
- jobStatus.msg = "job " + (i + 1) + "/" + jobs.length;
- boolean success = jobs[i].waitForCompletion(true);
- if (!success) {
- throw new Exception("Job failed.");
- }
- jobStatus.msg = "postJob " + (i + 1);
- tool.postJob(i, jobs[i]);
- progress += delta;
- }
- currentJob = null;
- progress = 0.9f;
- jobStatus.msg = "finish";
- Map<String,Object> res = tool.finish();
- if (res != null) {
- jobStatus.result = res;
- }
- progress = 1.0f;
+ jobStatus.msg = "OK";
+ jobStatus.result = tool.run(args);
jobStatus.state = State.FINISHED;
} catch (Exception e) {
- jobStatus.msg = "ERROR " + jobStatus.msg + ": " + e.toString();
+ e.printStackTrace();
+ jobStatus.msg = "ERROR: " + e.toString();
jobStatus.state = State.FAILED;
}
}
Added: nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,242 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.fetcher.FetcherJob;
+import org.apache.nutch.indexer.solr.SolrIndexerJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Crawler extends NutchTool implements Tool {
+ private static final Logger LOG = LoggerFactory.getLogger(Crawler.class);
+
+ private boolean cleanSeedDir = false;
+ private String tmpSeedDir = null;
+ private HashMap<String,Object> results = new HashMap<String,Object>();
+ private Map<String,Object> status =
+ Collections.synchronizedMap(new HashMap<String,Object>());
+ private NutchTool currentTool = null;
+ private boolean shouldStop = false;
+
+ @Override
+ public Map<String,Object> getStatus() {
+ return status;
+ }
+
+ private Map<String,Object> runTool(Class<? extends NutchTool> toolClass,
+ Map<String,Object> args) throws Exception {
+ currentTool = (NutchTool)ReflectionUtils.newInstance(toolClass, getConf());
+ return currentTool.run(args);
+ }
+
+ @Override
+ public boolean stopJob() throws Exception {
+ shouldStop = true;
+ if (currentTool != null) {
+ return currentTool.stopJob();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean killJob() throws Exception {
+ shouldStop = true;
+ if (currentTool != null) {
+ return currentTool.killJob();
+ }
+ return false;
+ }
+
+ @Override
+ public Map<String,Object> run(Map<String, Object> args) throws Exception {
+ results.clear();
+ status.clear();
+ String crawlId = (String)args.get(Nutch.ARG_CRAWL);
+ if (crawlId != null) {
+ getConf().set(Nutch.CRAWL_ID_KEY, crawlId);
+ }
+ String seedDir = null;
+ String seedList = (String)args.get(Nutch.ARG_SEEDLIST);
+ if (seedList != null) { // takes precedence
+ String[] seeds = seedList.split("\\s+");
+ // create tmp. dir
+ String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-" +
+ System.currentTimeMillis();
+ FileSystem fs = FileSystem.get(getConf());
+ Path p = new Path(tmpSeedDir);
+ fs.mkdirs(p);
+ Path seedOut = new Path(p, "urls");
+ OutputStream os = fs.create(seedOut);
+ for (String s : seeds) {
+ os.write(s.getBytes());
+ os.write('\n');
+ }
+ os.flush();
+ os.close();
+ cleanSeedDir = true;
+ seedDir = tmpSeedDir;
+ } else {
+ seedDir = (String)args.get(Nutch.ARG_SEEDDIR);
+ }
+ Integer depth = (Integer)args.get(Nutch.ARG_DEPTH);
+ if (depth == null) depth = 1;
+ Boolean parse = (Boolean)args.get(Nutch.ARG_PARSE);
+ if (parse == null) {
+ parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false);
+ }
+ String solrUrl = (String)args.get(Nutch.ARG_SOLR);
+ int onePhase = 3;
+ if (!parse) onePhase++;
+ float totalPhases = depth * onePhase;
+ if (seedDir != null) totalPhases++;
+ float phase = 0;
+ Map<String,Object> jobRes = null;
+ LinkedHashMap<String,Object> subTools = new LinkedHashMap<String,Object>();
+ status.put(Nutch.STAT_JOBS, subTools);
+ results.put(Nutch.STAT_JOBS, subTools);
+ // inject phase
+ if (seedDir != null) {
+ status.put(Nutch.STAT_PHASE, "inject");
+ jobRes = runTool(InjectorJob.class, args);
+ if (jobRes != null) {
+ subTools.put("inject", jobRes);
+ }
+ status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+ if (cleanSeedDir && tmpSeedDir != null) {
+ LOG.info(" - cleaning tmp seed list in " + tmpSeedDir);
+ FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true);
+ }
+ }
+ if (shouldStop) {
+ return results;
+ }
+ // run "depth" cycles
+ for (int i = 0; i < depth; i++) {
+ status.put(Nutch.STAT_PHASE, "generate " + i);
+ jobRes = runTool(GeneratorJob.class, args);
+ if (jobRes != null) {
+ subTools.put("generate " + i, jobRes);
+ }
+ status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+ if (shouldStop) {
+ return results;
+ }
+ status.put(Nutch.STAT_PHASE, "fetch " + i);
+ jobRes = runTool(FetcherJob.class, args);
+ if (jobRes != null) {
+ subTools.put("fetch " + i, jobRes);
+ }
+ status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+ if (shouldStop) {
+ return results;
+ }
+ if (!parse) {
+ status.put(Nutch.STAT_PHASE, "parse " + i);
+ jobRes = runTool(ParserJob.class, args);
+ if (jobRes != null) {
+ subTools.put("parse " + i, jobRes);
+ }
+ status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+ if (shouldStop) {
+ return results;
+ }
+ }
+ status.put(Nutch.STAT_PHASE, "updatedb " + i);
+ jobRes = runTool(DbUpdaterJob.class, args);
+ if (jobRes != null) {
+ subTools.put("updatedb " + i, jobRes);
+ }
+ status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+ if (shouldStop) {
+ return results;
+ }
+ }
+ if (solrUrl != null) {
+ status.put(Nutch.STAT_PHASE, "index");
+ jobRes = runTool(SolrIndexerJob.class, args);
+ if (jobRes != null) {
+ subTools.put("index", jobRes);
+ }
+ }
+ return results;
+ }
+
+ @Override
+ public float getProgress() {
+ Float p = (Float)status.get(Nutch.STAT_PROGRESS);
+ if (p == null) return 0;
+ return p;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length == 0) {
+ System.out.println("Usage: Crawl (<seedDir> | -continue) [-solr <solrURL>] [-threads n] [-depth i] [-topN N]");
+ return -1;
+ }
+ // parse most common arguments here
+ String seedDir = null;
+ int threads = getConf().getInt("fetcher.threads.fetch", 10);
+ int depth = 5;
+ long topN = Long.MAX_VALUE;
+ String solrUrl = null;
+
+ for (int i = 0; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ threads = Integer.parseInt(args[i+1]);
+ i++;
+ } else if ("-depth".equals(args[i])) {
+ depth = Integer.parseInt(args[i+1]);
+ i++;
+ } else if ("-topN".equals(args[i])) {
+ topN = Integer.parseInt(args[i+1]);
+ i++;
+ } else if ("-solr".equals(args[i])) {
+ solrUrl = StringUtils.lowerCase(args[i + 1]);
+ i++;
+ } else if ("-continue".equals(args[i])) {
+ // skip
+ } else if (args[i] != null) {
+ seedDir = args[i];
+ }
+ }
+ Map<String,Object> argMap = ToolUtil.toArgMap(
+ Nutch.ARG_THREADS, threads,
+ Nutch.ARG_DEPTH, depth,
+ Nutch.ARG_TOPN, topN,
+ Nutch.ARG_SOLR, solrUrl,
+ Nutch.ARG_SEEDDIR, seedDir);
+ run(argMap);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Crawler c = new Crawler();
+ Configuration conf = NutchConfiguration.create();
+ int res = ToolRunner.run(conf, c, args);
+ System.exit(res);
+ }
+}
Propchange: nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java Thu Nov 25 12:05:57 2010
@@ -1,6 +1,9 @@
package org.apache.nutch.crawl;
+import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -19,10 +22,10 @@ import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
import org.apache.gora.mapreduce.StringComparator;
-public class DbUpdaterJob extends Configured
-implements Tool, NutchTool {
+public class DbUpdaterJob extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(DbUpdaterJob.class);
@@ -53,49 +56,33 @@ implements Tool, NutchTool {
public DbUpdaterJob(Configuration conf) {
setConf(conf);
}
-
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
- }
-
- public Map<String,Object> finish() throws Exception {
- return null;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
- String crawlId = null;
- if (args.length > 0) {
- crawlId = (String)args[0];
- }
- Job job = new NutchJob(getConf(), "update-table");
+
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
+ String crawlId = (String)args.get(Nutch.ARG_CRAWL);
+ numJobs = 1;
+ currentJobNum = 0;
+ currentJob = new NutchJob(getConf(), "update-table");
if (crawlId != null) {
- job.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
+ currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
}
//job.setBoolean(ALL, updateAll);
ScoringFilters scoringFilters = new ScoringFilters(getConf());
HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
fields.addAll(scoringFilters.getFields());
// TODO: Figure out why this needs to be here
- job.getConfiguration().setClass("mapred.output.key.comparator.class",
+ currentJob.getConfiguration().setClass("mapred.output.key.comparator.class",
StringComparator.class, RawComparator.class);
- StorageUtils.initMapperJob(job, fields, String.class,
+ StorageUtils.initMapperJob(currentJob, fields, String.class,
NutchWritable.class, DbUpdateMapper.class);
- StorageUtils.initReducerJob(job, DbUpdateReducer.class);
- return new Job[]{job};
+ StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ return results;
}
- private int updateTable(String batchId) throws Exception {
+ private int updateTable(String crawlId) throws Exception {
LOG.info("DbUpdaterJob: starting");
- Job[] jobs = createJobs(new Object[]{batchId});
- boolean success = jobs[0].waitForCompletion(true);
- if (!success){
- LOG.info("DbUpdaterJob: failed");
- return -1;
- }
+ run(ToolUtil.toArgMap(Nutch.ARG_CRAWL, crawlId));
LOG.info("DbUpdaterJob: done");
return 0;
}
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java Thu Nov 25 12:05:57 2010
@@ -3,6 +3,7 @@ package org.apache.nutch.crawl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -26,8 +27,9 @@ import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
-public class GeneratorJob extends Configured implements Tool, NutchTool {
+public class GeneratorJob extends NutchTool implements Tool {
public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
public static final String GENERATOR_MIN_SCORE = "generate.min.score";
public static final String GENERATOR_FILTER = "generate.filter";
@@ -124,36 +126,28 @@ public class GeneratorJob extends Config
setConf(conf);
}
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
- }
-
- public Map<String,Object> finish() throws Exception {
- HashMap<String,Object> res = new HashMap<String,Object>();
- res.put(BATCH_ID, batchId);
- return res;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
// map to inverted subset due for fetch, sort by score
- long topN = (Long)args[0];
- long curTime = (Long)args[1];
- boolean filter = (Boolean)args[2];
- boolean norm = (Boolean)args[3];
+ Long topN = (Long)args.get(Nutch.ARG_TOPN);
+ Long curTime = (Long)args.get(Nutch.ARG_CURTIME);
+ if (curTime == null) {
+ curTime = System.currentTimeMillis();
+ }
+ Boolean filter = (Boolean)args.get(Nutch.ARG_FILTER);
+ Boolean norm = (Boolean)args.get(Nutch.ARG_NORMALIZE);
// map to inverted subset due for fetch, sort by score
getConf().setLong(GENERATOR_CUR_TIME, curTime);
- getConf().setLong(GENERATOR_TOP_N, topN);
- getConf().setBoolean(GENERATOR_FILTER, filter);
+ if (topN != null)
+ getConf().setLong(GENERATOR_TOP_N, topN);
+ if (filter != null)
+ getConf().setBoolean(GENERATOR_FILTER, filter);
int randomSeed = Math.abs(new Random().nextInt());
- String batchId = (curTime / 1000) + "-" + randomSeed;
+ batchId = (curTime / 1000) + "-" + randomSeed;
getConf().setInt(GENERATOR_RANDOM_SEED, randomSeed);
getConf().set(BATCH_ID, batchId);
getConf().setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis());
- getConf().setBoolean(GENERATOR_NORMALISE, norm);
+ if (norm != null)
+ getConf().setBoolean(GENERATOR_NORMALISE, norm);
String mode = getConf().get(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
if (GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode)) {
getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
@@ -164,12 +158,16 @@ public class GeneratorJob extends Config
getConf().set(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
}
-
- Job job = new NutchJob(getConf(), "generate: " + batchId);
- StorageUtils.initMapperJob(job, FIELDS, SelectorEntry.class,
+ numJobs = 1;
+ currentJobNum = 0;
+ currentJob = new NutchJob(getConf(), "generate: " + batchId);
+ StorageUtils.initMapperJob(currentJob, FIELDS, SelectorEntry.class,
WebPage.class, GeneratorMapper.class, URLPartitioner.class, true);
- StorageUtils.initReducerJob(job, GeneratorReducer.class);
- return new Job[]{job};
+ StorageUtils.initReducerJob(currentJob, GeneratorReducer.class);
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ results.put(BATCH_ID, batchId);
+ return results;
}
private String batchId;
@@ -188,12 +186,12 @@ public class GeneratorJob extends Config
if (topN != Long.MAX_VALUE) {
LOG.info("GeneratorJob: topN: " + topN);
}
- Job[] jobs = createJobs(topN, curTime, filter, norm);
- boolean success = jobs[0].waitForCompletion(true);
- if (!success) return null;
-
+ run(ToolUtil.toArgMap(
+ Nutch.ARG_TOPN, topN,
+ Nutch.ARG_CURTIME, curTime,
+ Nutch.ARG_FILTER, filter,
+ Nutch.ARG_NORMALIZE, norm));
batchId = getConf().get(BATCH_ID);
-
LOG.info("GeneratorJob: done");
LOG.info("GeneratorJob: generated batch id: " + batchId);
return batchId;
@@ -227,4 +225,5 @@ public class GeneratorJob extends Config
int res = ToolRunner.run(NutchConfiguration.create(), new GeneratorJob(), args);
System.exit(res);
}
+
}
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java Thu Nov 25 12:05:57 2010
@@ -18,7 +18,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -37,6 +36,7 @@ import org.apache.nutch.util.NutchConfig
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
/** This class takes a flat file of URLs and adds them to the of pages to be
* crawled. Useful for bootstrapping the system.
@@ -47,15 +47,10 @@ import org.apache.nutch.util.TableUtil;
* - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
* e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
**/
-public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
- implements Tool, NutchTool {
+public class InjectorJob extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(InjectorJob.class);
- private Configuration conf;
-
- private FetchSchedule schedule;
-
private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
private static final Utf8 YES_STRING = new Utf8("y");
@@ -173,92 +168,88 @@ public class InjectorJob extends GoraMap
context.write(reversedUrl, row);
}
}
+
+ public static class InjectorMapper
+ extends GoraMapper<String, WebPage, String, WebPage> {
+ private FetchSchedule schedule;
- public InjectorJob() {
-
- }
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+ }
- public InjectorJob(Configuration conf) {
- setConf(conf);
- }
+ @Override
+ protected void map(String key, WebPage row, Context context)
+ throws IOException, InterruptedException {
+ if (Mark.INJECT_MARK.checkMark(row) == null) {
+ return;
+ }
+ Mark.INJECT_MARK.removeMark(row);
+ if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
+ row.setStatus(CrawlStatus.STATUS_UNFETCHED);
+ schedule.initializeSchedule(key, row);
+ // row.setScore(scoreInjected);
+ }
- @Override
- public Configuration getConf() {
- return conf;
+ context.write(key, row);
+ }
+
}
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
+ public InjectorJob() {
- @Override
- public void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- schedule = FetchScheduleFactory.getFetchSchedule(conf);
- // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
}
- @Override
- protected void map(String key, WebPage row, Context context)
- throws IOException, InterruptedException {
- if (Mark.INJECT_MARK.checkMark(row) == null) {
- return;
- }
- Mark.INJECT_MARK.removeMark(row);
- if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
- row.setStatus(CrawlStatus.STATUS_UNFETCHED);
- schedule.initializeSchedule(key, row);
- // row.setScore(scoreInjected);
- }
-
- context.write(key, row);
- }
-
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
+ public InjectorJob(Configuration conf) {
+ setConf(conf);
}
- public Map<String,Object> finish() throws Exception {
- return null;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
- Job[] jobs = new Job[2];
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
getConf().setLong("injector.current.time", System.currentTimeMillis());
- Job job = new NutchJob(getConf(), "inject-p1 " + args[0]);
- FileInputFormat.addInputPath(job, (Path)args[0]);
- job.setMapperClass(UrlMapper.class);
- job.setMapOutputKeyClass(String.class);
- job.setMapOutputValueClass(WebPage.class);
- job.setOutputFormatClass(GoraOutputFormat.class);
- DataStore<String, WebPage> store = StorageUtils.createWebStore(job.getConfiguration(),
+ Path input;
+ Object path = args.get(Nutch.ARG_SEEDDIR);
+ if (path instanceof Path) {
+ input = (Path)path;
+ } else {
+ input = new Path(path.toString());
+ }
+ numJobs = 2;
+ currentJobNum = 0;
+ status.put(Nutch.STAT_PHASE, "convert input");
+ currentJob = new NutchJob(getConf(), "inject-p1 " + input);
+ FileInputFormat.addInputPath(currentJob, input);
+ currentJob.setMapperClass(UrlMapper.class);
+ currentJob.setMapOutputKeyClass(String.class);
+ currentJob.setMapOutputValueClass(WebPage.class);
+ currentJob.setOutputFormatClass(GoraOutputFormat.class);
+ DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
String.class, WebPage.class);
- GoraOutputFormat.setOutput(job, store, true);
- job.setReducerClass(Reducer.class);
- job.setNumReduceTasks(0);
- job.waitForCompletion(true);
-
- job = new NutchJob(getConf(), "inject-p2 " + args[0]);
- StorageUtils.initMapperJob(job, FIELDS, String.class,
- WebPage.class, InjectorJob.class);
- job.setNumReduceTasks(0);
- jobs[1] = job;
- return jobs;
+ GoraOutputFormat.setOutput(currentJob, store, true);
+ currentJob.setReducerClass(Reducer.class);
+ currentJob.setNumReduceTasks(0);
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ currentJob = null;
+
+ status.put(Nutch.STAT_PHASE, "merge input with db");
+ status.put(Nutch.STAT_PROGRESS, 0.5f);
+ currentJobNum = 1;
+ currentJob = new NutchJob(getConf(), "inject-p2 " + input);
+ StorageUtils.initMapperJob(currentJob, FIELDS, String.class,
+ WebPage.class, InjectorMapper.class);
+ currentJob.setNumReduceTasks(0);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ status.put(Nutch.STAT_PROGRESS, 1.0f);
+ return results;
}
public void inject(Path urlDir) throws Exception {
LOG.info("InjectorJob: starting");
LOG.info("InjectorJob: urlDir: " + urlDir);
- Job[] jobs = createJobs(urlDir);
- jobs[0].waitForCompletion(true);
-
- jobs[1].waitForCompletion(true);
+ run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
}
@Override
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java Thu Nov 25 12:05:57 2010
@@ -13,7 +13,6 @@ import org.apache.avro.util.Utf8;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -30,8 +29,6 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.parse.ParseStatusUtils;
import org.apache.nutch.protocol.ProtocolStatusUtils;
-import org.apache.nutch.storage.ParseStatus;
-import org.apache.nutch.storage.ProtocolStatus;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.Bytes;
@@ -40,6 +37,7 @@ import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.StringUtil;
import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
@@ -49,7 +47,7 @@ import org.apache.gora.store.DataStore;
* Displays information about the entries of the webtable
**/
-public class WebTableReader extends Configured implements Tool, NutchTool {
+public class WebTableReader extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(WebTableReader.class);
@@ -198,13 +196,13 @@ public class WebTableReader extends Conf
public void processStatJob(boolean sort) throws Exception {
- Job[] jobs = createJobs(sort);
-
if (LOG.isInfoEnabled()) {
LOG.info("WebTable statistics start");
}
- boolean success = jobs[0].waitForCompletion(true);
- postJob(0, jobs[0]);
+ run(ToolUtil.toArgMap(Nutch.ARG_SORT, sort));
+ for (Entry<String,Object> e : results.entrySet()) {
+ LOG.info(e.getKey() + ":\t" + e.getValue());
+ }
}
/** Prints out the entry to the standard out **/
@@ -479,56 +477,46 @@ public class WebTableReader extends Conf
}
}
- @Override
- public Map<String, Object> prepare() throws Exception {
- // TODO Auto-generated method stub
- return null;
- }
-
// for now handles only -stat
@Override
- public Job[] createJobs(Object... args) throws Exception {
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
Path tmpFolder = new Path(getConf().get("mapred.temp.dir", ".")
+ "stat_tmp" + System.currentTimeMillis());
- Job job = new NutchJob(getConf(), "db_stats");
+ numJobs = 1;
+ currentJob = new NutchJob(getConf(), "db_stats");
- boolean sort = false;
- if (args != null && args.length > 0) {
- sort = (Boolean)args[0];
- }
- job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
+ Boolean sort = (Boolean)args.get(Nutch.ARG_SORT);
+ if (sort == null) sort = Boolean.FALSE;
+ currentJob.getConfiguration().setBoolean("db.reader.stats.sort", sort);
- DataStore<String, WebPage> store = StorageUtils.createWebStore(job
+ DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob
.getConfiguration(), String.class, WebPage.class);
Query<String, WebPage> query = store.newQuery();
query.setFields(WebPage._ALL_FIELDS);
- GoraMapper.initMapperJob(job, query, store, Text.class, LongWritable.class,
+ GoraMapper.initMapperJob(currentJob, query, store, Text.class, LongWritable.class,
WebTableStatMapper.class, null, true);
- job.setCombinerClass(WebTableStatCombiner.class);
- job.setReducerClass(WebTableStatReducer.class);
+ currentJob.setCombinerClass(WebTableStatCombiner.class);
+ currentJob.setReducerClass(WebTableStatReducer.class);
- FileOutputFormat.setOutputPath(job, tmpFolder);
+ FileOutputFormat.setOutputPath(currentJob, tmpFolder);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
-
- return new Job[]{job};
- }
-
- @Override
- public Map<String, Object> postJob(int jobIndex, Job job) throws Exception {
- Path tmpFolder = FileOutputFormat.getOutputPath(job);
+ currentJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+ currentJob.setOutputKeyClass(Text.class);
+ currentJob.setOutputValueClass(LongWritable.class);
FileSystem fileSystem = FileSystem.get(getConf());
- if (!job.isSuccessful()) {
- fileSystem.delete(tmpFolder, true);
- return null;
+ try {
+ currentJob.waitForCompletion(true);
+ } finally {
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ if (!currentJob.isSuccessful()) {
+ fileSystem.delete(tmpFolder, true);
+ return results;
+ }
}
Text key = new Text();
@@ -564,46 +552,40 @@ public class WebTableReader extends Conf
reader.close();
}
- if (LOG.isInfoEnabled()) {
- LOG.info("Statistics for WebTable: ");
- LongWritable totalCnt = stats.get("T");
- if (totalCnt==null)totalCnt=new LongWritable(0);
- stats.remove("T");
- LOG.info("TOTAL urls:\t" + totalCnt.get());
- for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
- String k = entry.getKey();
- LongWritable val = entry.getValue();
- if (k.equals("scn")) {
- LOG.info("min score:\t" + (float) (val.get() / 1000.0f));
- } else if (k.equals("scx")) {
- LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
- } else if (k.equals("sct")) {
- LOG.info("avg score:\t"
- + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
- } else if (k.startsWith("status")) {
- String[] st = k.split(" ");
- int code = Integer.parseInt(st[1]);
- if (st.length > 2)
- LOG.info(" " + st[2] + " :\t" + val);
- else
- LOG.info(st[0] + " " + code + " ("
- + CrawlStatus.getName((byte) code) + "):\t" + val);
- } else
- LOG.info(k + ":\t" + val);
- }
+ LongWritable totalCnt = stats.get("T");
+ if (totalCnt==null)totalCnt=new LongWritable(0);
+ stats.remove("T");
+ results.put("TOTAL urls", totalCnt.get());
+ for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+ String k = entry.getKey();
+ LongWritable val = entry.getValue();
+ if (k.equals("scn")) {
+ results.put("min score", (float) (val.get() / 1000.0f));
+ } else if (k.equals("scx")) {
+ results.put("max score", (float) (val.get() / 1000.0f));
+ } else if (k.equals("sct")) {
+ results.put("avg score",
+ (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
+ } else if (k.startsWith("status")) {
+ String[] st = k.split(" ");
+ int code = Integer.parseInt(st[1]);
+ if (st.length > 2)
+ results.put(st[2], val.get());
+ else
+ results.put(st[0] + " " + code + " ("
+ + CrawlStatus.getName((byte) code) + ")", val.get());
+ } else
+ results.put(k, val.get());
}
// removing the tmp folder
fileSystem.delete(tmpFolder, true);
if (LOG.isInfoEnabled()) {
+ LOG.info("Statistics for WebTable: ");
+ for (Entry<String,Object> e : results.entrySet()) {
+ LOG.info(e.getKey() + ":\t" + e.getValue());
+ }
LOG.info("WebTable statistics: done");
}
- return null;
- }
-
- @Override
- public Map<String, Object> finish() throws Exception {
- // TODO Auto-generated method stub
- return null;
+ return results;
}
-
}
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java Thu Nov 25 12:05:57 2010
@@ -3,6 +3,8 @@ package org.apache.nutch.fetcher;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
@@ -27,13 +29,14 @@ import org.apache.nutch.util.NutchConfig
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
import org.apache.gora.mapreduce.GoraMapper;
/**
* Multi-threaded fetcher.
*
*/
-public class FetcherJob implements Tool, NutchTool {
+public class FetcherJob extends NutchTool implements Tool {
public static final String PROTOCOL_REDIR = "protocol";
@@ -108,8 +111,6 @@ public class FetcherJob implements Tool,
public static final Logger LOG = LoggerFactory.getLogger(FetcherJob.class);
- private Configuration conf;
-
public FetcherJob() {
}
@@ -118,16 +119,6 @@ public class FetcherJob implements Tool,
setConf(conf);
}
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
public Collection<WebPage.Field> getFields(Job job) {
Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
if (job.getConfiguration().getBoolean(PARSE_KEY, true)) {
@@ -140,32 +131,28 @@ public class FetcherJob implements Tool,
return fields;
}
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
- }
-
- public Map<String,Object> finish() throws Exception {
- return null;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
+ @Override
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
checkConfiguration();
- String batchId = (String)args[0];
- int threads = (Integer)args[1];
- boolean shouldResume = (Boolean)args[2];
- boolean parse = (Boolean)args[3];
- int numTasks = (Integer)args[4];
+ String batchId = (String)args.get(Nutch.ARG_BATCH);
+ Integer threads = (Integer)args.get(Nutch.ARG_THREADS);
+ Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
+ Boolean parse = (Boolean)args.get(Nutch.ARG_PARSE);
+ Integer numTasks = (Integer)args.get(Nutch.ARG_NUMTASKS);
- if (threads > 0) {
+ if (threads != null && threads > 0) {
getConf().setInt(THREADS_KEY, threads);
}
+ if (batchId == null) {
+ batchId = Nutch.ALL_BATCH_ID_STR;
+ }
getConf().set(GeneratorJob.BATCH_ID, batchId);
- getConf().setBoolean(PARSE_KEY, parse);
- getConf().setBoolean(RESUME_KEY, shouldResume);
+ if (parse != null) {
+ getConf().setBoolean(PARSE_KEY, parse);
+ }
+ if (shouldResume != null) {
+ getConf().setBoolean(RESUME_KEY, shouldResume);
+ }
// set the actual time for the timelimit relative
// to the beginning of the whole job and not of a specific task
@@ -175,19 +162,20 @@ public class FetcherJob implements Tool,
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
getConf().setLong("fetcher.timelimit", timelimit);
}
-
- Job job = new NutchJob(getConf(), "fetch");
- Collection<WebPage.Field> fields = getFields(job);
- StorageUtils.initMapperJob(job, fields, IntWritable.class,
+ numJobs = 1;
+ currentJob = new NutchJob(getConf(), "fetch");
+ Collection<WebPage.Field> fields = getFields(currentJob);
+ StorageUtils.initMapperJob(currentJob, fields, IntWritable.class,
FetchEntry.class, FetcherMapper.class, PartitionUrlByHost.class, false);
- StorageUtils.initReducerJob(job, FetcherReducer.class);
- if (numTasks < 1) {
- job.setNumReduceTasks(job.getConfiguration().getInt("mapred.map.tasks",
- job.getNumReduceTasks()));
+ StorageUtils.initReducerJob(currentJob, FetcherReducer.class);
+ if (numTasks == null || numTasks < 1) {
+ currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",
+ currentJob.getNumReduceTasks()));
} else {
- job.setNumReduceTasks(numTasks);
+ currentJob.setNumReduceTasks(numTasks);
}
- return new Job[]{job};
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ return results;
}
/**
@@ -216,13 +204,12 @@ public class FetcherJob implements Tool,
LOG.info("FetcherJob: batchId: " + batchId);
}
- Job[] jobs = createJobs(batchId, threads, shouldResume, parse, numTasks);
- boolean success = jobs[0].waitForCompletion(true);
- if (!success) {
- LOG.info("FetcherJob: failed");
- return -1;
- }
-
+ run(ToolUtil.toArgMap(
+ Nutch.ARG_BATCH, batchId,
+ Nutch.ARG_THREADS, threads,
+ Nutch.ARG_RESUME, shouldResume,
+ Nutch.ARG_PARSE, parse,
+ Nutch.ARG_NUMTASKS, numTasks));
LOG.info("FetcherJob: done");
return 0;
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java Thu Nov 25 12:05:57 2010
@@ -21,13 +21,12 @@ import org.apache.nutch.storage.ParseSta
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
import org.apache.gora.mapreduce.GoraMapper;
import org.apache.gora.mapreduce.StringComparator;
-public abstract class IndexerJob
-extends GoraMapper<String, WebPage, String, WebPage>
-implements Tool {
+public abstract class IndexerJob extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(IndexerJob.class);
@@ -35,55 +34,47 @@ implements Tool {
private static final Utf8 REINDEX = new Utf8("-reindex");
- private Configuration conf;
-
- protected Utf8 batchId;
-
static {
FIELDS.add(WebPage.Field.SIGNATURE);
FIELDS.add(WebPage.Field.PARSE_STATUS);
FIELDS.add(WebPage.Field.SCORE);
FIELDS.add(WebPage.Field.MARKERS);
}
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
- }
-
- @Override
- public void map(String key, WebPage page, Context context)
- throws IOException, InterruptedException {
- ParseStatus pstatus = page.getParseStatus();
- if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)
- || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {
- return; // filter urls not parsed
+
+ public static class IndexerMapper
+ extends GoraMapper<String, WebPage, String, WebPage> {
+ protected Utf8 batchId;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
}
- Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
- if (!batchId.equals(REINDEX)) {
- if (!NutchJob.shouldProcess(mark, batchId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
+ @Override
+ public void map(String key, WebPage page, Context context)
+ throws IOException, InterruptedException {
+ ParseStatus pstatus = page.getParseStatus();
+ if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)
+ || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {
+ return; // filter urls not parsed
+ }
+
+ Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
+ if (!batchId.equals(REINDEX)) {
+ if (!NutchJob.shouldProcess(mark, batchId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
+ }
+ return;
}
- return;
}
- }
- context.write(key, page);
+ context.write(key, page);
+ }
}
+
private static Collection<WebPage.Field> getFields(Job job) {
Configuration conf = job.getConfiguration();
Collection<WebPage.Field> columns = new HashSet<WebPage.Field>(FIELDS);
@@ -103,7 +94,8 @@ implements Tool {
StringComparator.class, RawComparator.class);
Collection<WebPage.Field> fields = getFields(job);
- StorageUtils.initMapperJob(job, fields, String.class, WebPage.class, this.getClass());
+ StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
+ IndexerMapper.class);
job.setReducerClass(IndexerReducer.class);
job.setOutputFormatClass(IndexerOutputFormat.class);
return job;
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java Thu Nov 25 12:05:57 2010
@@ -16,6 +16,9 @@
*/
package org.apache.nutch.indexer.solr;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Random;
@@ -32,53 +35,46 @@ import org.apache.nutch.indexer.NutchInd
import org.apache.nutch.metadata.Nutch;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
-public class SolrIndexerJob extends IndexerJob implements NutchTool {
+public class SolrIndexerJob extends IndexerJob {
public static Logger LOG = LoggerFactory.getLogger(SolrIndexerJob.class);
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
- }
-
- public Map<String,Object> finish() throws Exception {
- return null;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
- String solrUrl = (String)args[0];
- String batchId = (String)args[1];
+ @Override
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
+ String solrUrl = (String)args.get(Nutch.ARG_SOLR);
+ String batchId = (String)args.get(Nutch.ARG_BATCH);
NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
getConf().set(SolrConstants.SERVER_URL, solrUrl);
- Job job = createIndexJob(getConf(), "solr-index", batchId);
+ currentJob = createIndexJob(getConf(), "solr-index", batchId);
Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
+ new Random().nextInt());
- FileOutputFormat.setOutputPath(job, tmp);
- return new Job[]{job};
+ FileOutputFormat.setOutputPath(currentJob, tmp);
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ return results;
}
- private void indexSolr(String solrUrl, String crawlId) throws Exception {
+ private void indexSolr(String solrUrl, String batchId) throws Exception {
LOG.info("SolrIndexerJob: starting");
- Job[] jobs = createJobs(solrUrl, crawlId);
- boolean success = false;
try {
- success = jobs[0].waitForCompletion(true);
+ run(ToolUtil.toArgMap(
+ Nutch.ARG_SOLR, solrUrl,
+ Nutch.ARG_BATCH, batchId));
// do the commits once and for all the reducers in one go
SolrServer solr = new CommonsHttpSolrServer(solrUrl);
solr.commit();
} finally {
- FileSystem.get(getConf()).delete(FileOutputFormat.getOutputPath(jobs[0]), true);
+ FileSystem.get(getConf()).delete(
+ FileOutputFormat.getOutputPath(currentJob), true);
}
- LOG.info("SolrIndexerJob: " + (success ? "done" : "failed"));
+ LOG.info("SolrIndexerJob: done.");
}
public int run(String[] args) throws Exception {
Modified: nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java Thu Nov 25 12:05:57 2010
@@ -75,4 +75,53 @@ public interface Nutch {
public static final Utf8 ALL_CRAWL_ID = new Utf8(ALL_BATCH_ID_STR);
public static final String CRAWL_ID_KEY = "storage.crawl.id";
+
+
+ // short constants for cmd-line args
+ /** Batch id to select. */
+ public static final String ARG_BATCH = "batch";
+ /** Crawl id to use. */
+ public static final String ARG_CRAWL = "crawl";
+ /** Resume previously aborted op. */
+ public static final String ARG_RESUME = "resume";
+ /** Force processing even if there are locks or inconsistencies. */
+ public static final String ARG_FORCE = "force";
+ /** Parse during fetching. */
+ public static final String ARG_PARSE = "parse";
+ /** Sort statistics. */
+ public static final String ARG_SORT = "sort";
+ /** Solr URL. */
+ public static final String ARG_SOLR = "solr";
+ /** Number of fetcher threads (per map task). */
+ public static final String ARG_THREADS = "threads";
+ /** Number of fetcher tasks. */
+ public static final String ARG_NUMTASKS = "numTasks";
+ /** Generate topN scoring URLs. */
+ public static final String ARG_TOPN = "topN";
+ /** The notion of current time. */
+ public static final String ARG_CURTIME = "curTime";
+ /** Apply URLFilters. */
+ public static final String ARG_FILTER = "filter";
+ /** Apply URLNormalizers. */
+ public static final String ARG_NORMALIZE = "normalize";
+ /** Whitespace-separated list of seed URLs. */
+ public static final String ARG_SEEDLIST = "seed";
+ /** a path to a directory containing a list of seed URLs. */
+ public static final String ARG_SEEDDIR = "seedDir";
+ /** Class to run as a NutchTool. */
+ public static final String ARG_CLASS = "class";
+ /** Depth (number of cycles) of a crawl. */
+ public static final String ARG_DEPTH = "depth";
+
+ // short constants for status / results fields
+ /** Status / result message. */
+ public static final String STAT_MESSAGE = "msg";
+ /** Phase of processing. */
+ public static final String STAT_PHASE = "phase";
+ /** Progress (float). */
+ public static final String STAT_PROGRESS = "progress";
+ /** Jobs. */
+ public static final String STAT_JOBS = "jobs";
+ /** Counters. */
+ public static final String STAT_COUNTERS = "counters";
}
Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java Thu Nov 25 12:05:57 2010
@@ -20,6 +20,8 @@ public interface ParseStatusCodes {
// Secondary success codes go here:
+ public static final short SUCCESS_OK = 0;
+
/** Parsed content contains a directive to redirect to another URL.
* The target URL can be retrieved from the arguments.
*/
Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java Thu Nov 25 12:05:57 2010
@@ -16,6 +16,7 @@ public class ParseStatusUtils {
static {
STATUS_SUCCESS.setMajorCode(ParseStatusCodes.SUCCESS);
+ minorCodes.put(ParseStatusCodes.SUCCESS_OK, "ok");
minorCodes.put(ParseStatusCodes.SUCCESS_REDIRECT, "redirect");
minorCodes.put(ParseStatusCodes.FAILED_EXCEPTION, "exception");
minorCodes.put(ParseStatusCodes.FAILED_INVALID_FORMAT, "invalid_format");
@@ -81,7 +82,7 @@ public class ParseStatusUtils {
}
StringBuilder sb = new StringBuilder();
sb.append(ParseStatusCodes.majorCodes[status.getMajorCode()] +
- "/" + minorCodes.get(status.getMinorCode()));
+ "/" + minorCodes.get((short)status.getMinorCode()));
sb.append(" (" + status.getMajorCode() + "/" + status.getMinorCode() + ")");
sb.append(", args=[");
GenericArray<Utf8> args = status.getArgs();
Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java Thu Nov 25 12:05:57 2010
@@ -25,10 +25,10 @@ import org.apache.nutch.util.NutchConfig
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
import org.apache.gora.mapreduce.GoraMapper;
-public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
- implements Tool, NutchTool {
+public class ParserJob extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(ParserJob.class);
@@ -50,66 +50,70 @@ public class ParserJob extends GoraMappe
FIELDS.add(WebPage.Field.METADATA);
}
- private ParseUtil parseUtil;
- private boolean shouldResume;
+ public static class ParserMapper
+ extends GoraMapper<String, WebPage, String, WebPage> {
+ private ParseUtil parseUtil;
- private boolean force;
+ private boolean shouldResume;
- private Utf8 batchId;
+ private boolean force;
- public ParserJob() {
-
- }
-
- public ParserJob(Configuration conf) {
- setConf(conf);
- }
-
- @Override
- public void setup(Context context) throws IOException {
- Configuration conf = context.getConfiguration();
- parseUtil = new ParseUtil(conf);
- shouldResume = conf.getBoolean(RESUME_KEY, false);
- force = conf.getBoolean(FORCE_KEY, false);
- batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
- }
-
- @Override
- public void map(String key, WebPage page, Context context)
- throws IOException, InterruptedException {
- Utf8 mark = Mark.FETCH_MARK.checkMark(page);
- if (!NutchJob.shouldProcess(mark, batchId)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
- }
- return;
+ private Utf8 batchId;
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ parseUtil = new ParseUtil(conf);
+ shouldResume = conf.getBoolean(RESUME_KEY, false);
+ force = conf.getBoolean(FORCE_KEY, false);
+ batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
}
- if (shouldResume && Mark.PARSE_MARK.checkMark(page) != null) {
- if (force) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; already parsed");
- }
- } else {
+
+ @Override
+ public void map(String key, WebPage page, Context context)
+ throws IOException, InterruptedException {
+ Utf8 mark = Mark.FETCH_MARK.checkMark(page);
+ if (!NutchJob.shouldProcess(mark, batchId)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
}
return;
}
- }
+ if (shouldResume && Mark.PARSE_MARK.checkMark(page) != null) {
+ if (force) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; already parsed");
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
+ }
+ return;
+ }
+ }
- URLWebPage redirectedPage = parseUtil.process(key, page);
- ParseStatus pstatus = page.getParseStatus();
- if (pstatus != null) {
- context.getCounter("ParserStatus",
- ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1);
- }
+ URLWebPage redirectedPage = parseUtil.process(key, page);
+ ParseStatus pstatus = page.getParseStatus();
+ if (pstatus != null) {
+ context.getCounter("ParserStatus",
+ ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1);
+ }
+
+ if (redirectedPage != null) {
+ context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
+ redirectedPage.getDatum());
+ }
+ context.write(key, page);
+ }
+ }
+
+ public ParserJob() {
- if (redirectedPage != null) {
- context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
- redirectedPage.getDatum());
- }
- context.write(key, page);
+ }
+
+ public ParserJob(Configuration conf) {
+ setConf(conf);
}
public Collection<WebPage.Field> getFields(Job job) {
@@ -146,42 +150,35 @@ public class ParserJob extends GoraMappe
this.conf = conf;
}
- public Map<String,Object> prepare() throws Exception {
- return null;
- }
-
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
- return null;
- }
-
- public Map<String,Object> finish() throws Exception {
- return null;
- }
-
- public Job[] createJobs(Object... args) throws Exception {
- String batchId = (String)args[0];
- boolean shouldResume = (Boolean)args[1];
- boolean force = (Boolean)args[2];
+ @Override
+ public Map<String,Object> run(Map<String,Object> args) throws Exception {
+ String batchId = (String)args.get(Nutch.ARG_BATCH);
+ Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
+ Boolean force = (Boolean)args.get(Nutch.ARG_FORCE);
if (batchId != null) {
getConf().set(GeneratorJob.BATCH_ID, batchId);
}
- getConf().setBoolean(RESUME_KEY, shouldResume);
- getConf().setBoolean(FORCE_KEY, force);
- final Job job = new NutchJob(getConf(), "parse");
+ if (shouldResume != null) {
+ getConf().setBoolean(RESUME_KEY, shouldResume);
+ }
+ if (force != null) {
+ getConf().setBoolean(FORCE_KEY, force);
+ }
+ currentJob = new NutchJob(getConf(), "parse");
- Collection<WebPage.Field> fields = getFields(job);
- StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
- ParserJob.class);
- StorageUtils.initReducerJob(job, IdentityPageReducer.class);
- job.setNumReduceTasks(0);
-
- return new Job[]{job};
+ Collection<WebPage.Field> fields = getFields(currentJob);
+ StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
+ ParserMapper.class);
+ StorageUtils.initReducerJob(currentJob, IdentityPageReducer.class);
+ currentJob.setNumReduceTasks(0);
+
+ currentJob.waitForCompletion(true);
+ ToolUtil.recordJobStatus(null, currentJob, results);
+ return results;
}
- public int parse(String crawlId, boolean shouldResume, boolean force) throws Exception {
- Job[] jobs = createJobs(crawlId, shouldResume, force);
-
+ public int parse(String batchId, boolean shouldResume, boolean force) throws Exception {
LOG.info("ParserJob: starting");
LOG.info("ParserJob: resuming:\t" + getConf().getBoolean(RESUME_KEY, false));
@@ -191,11 +188,10 @@ public class ParserJob extends GoraMappe
} else {
LOG.info("ParserJob: batchId:\t" + batchId);
}
- boolean success = jobs[0].waitForCompletion(true);
- if (!success){
- LOG.info("ParserJob: failed");
- return -1;
- }
+ run(ToolUtil.toArgMap(
+ Nutch.ARG_BATCH, batchId,
+ Nutch.ARG_RESUME, shouldResume,
+ Nutch.ARG_FORCE, force));
LOG.info("ParserJob: success");
return 0;
}