You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2010/11/25 13:05:58 UTC

svn commit: r1039014 [1/2] - in /nutch/trunk: ./ src/java/org/apache/nutch/api/ src/java/org/apache/nutch/api/impl/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer...

Author: ab
Date: Thu Nov 25 12:05:57 2010
New Revision: 1039014

URL: http://svn.apache.org/viewvc?rev=1039014&view=rev
Log:
NUTCH-932 Bulk REST API to retrieve crawl results as JSON.

Added:
    nutch/trunk/src/java/org/apache/nutch/api/DbReader.java   (with props)
    nutch/trunk/src/java/org/apache/nutch/api/DbResource.java   (with props)
    nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java   (with props)
    nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java   (with props)
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java
    nutch/trunk/src/java/org/apache/nutch/api/JobManager.java
    nutch/trunk/src/java/org/apache/nutch/api/JobResource.java
    nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java
    nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java
    nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java
    nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
    nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
    nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
    nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
    nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
    nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
    nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java
    nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java
    nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
    nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java
    nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Nov 25 12:05:57 2010
@@ -88,6 +88,9 @@ Release 2.0 - Current Development
 
 * NUTCH-931 Simple admin API to fetch status and stop the service (ab)
 
+* NUTCH-932 Bulk REST API to retrieve crawl results as JSON (ab)
+
+
 Release 1.1 - 2010-06-06
 
 * NUTCH-819 Included Solr schema.xml and solrindex-mapping.xml don't play together (ab)

Modified: nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/APIInfoResource.java Thu Nov 25 12:05:57 2010
@@ -13,6 +13,7 @@ public class APIInfoResource extends Ser
   static {
     info.add(new String[]{AdminResource.PATH, AdminResource.DESCR});
     info.add(new String[]{ConfResource.PATH, ConfResource.DESCR});
+    info.add(new String[]{DbResource.PATH, DbResource.DESCR});
     info.add(new String[]{JobResource.PATH, JobResource.DESCR});
   }
 

Added: nutch/trunk/src/java/org/apache/nutch/api/DbReader.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/DbReader.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/DbReader.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/api/DbReader.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,220 @@
+package org.apache.nutch.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+
+import org.apache.avro.util.Utf8;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParseStatusUtils;
+import org.apache.nutch.protocol.ProtocolStatusUtils;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.ParseStatus;
+import org.apache.nutch.storage.ProtocolStatus;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.Bytes;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbReader {
+  private static final Logger LOG = LoggerFactory.getLogger(DbReader.class);
+
+  DataStore<String,WebPage> store;
+  Configuration conf;
+  
+  public DbReader(Configuration conf, String crawlId) {
+    conf = new Configuration(conf);
+    if (crawlId != null) {
+      conf.set(Nutch.CRAWL_ID_KEY, crawlId);
+    }
+    try {
+      store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
+    } catch (Exception e) {
+      e.printStackTrace();
+      store = null;
+    }
+  }
+  
+  public Iterator<Map<String,Object>> iterator(String[] fields, String startKey, String endKey,
+      String batchId) throws Exception {
+    Query<String,WebPage> q = store.newQuery();
+    String[] qFields = fields;
+    if (fields != null) {
+      HashSet<String> flds = new HashSet<String>(Arrays.asList(fields));
+      // remove "url"
+      flds.remove("url");
+      if (flds.size() > 0) {
+        qFields = (String[])flds.toArray(new String[flds.size()]);
+      } else {
+        qFields = null;
+      }
+    }
+    q.setFields(qFields);
+    if (startKey != null) {
+      q.setStartKey(startKey);
+      if (endKey != null) {
+        q.setEndKey(endKey);
+      }
+    }
+    Result<String,WebPage> res = store.execute(q);
+    // XXX we should add the filtering capability to Query
+    return new DbIterator(res, fields, batchId);
+  }
+  
+  public void close() throws IOException {
+    if (store != null) {
+      store.close();
+    }
+  }
+  
+  private class DbIterator implements Iterator<Map<String,Object>> {
+    private Result<String,WebPage> res;
+    private boolean hasNext;
+    private String url;
+    private WebPage page;
+    private Utf8 batchId;
+    private TreeSet<String> fields;
+
+    DbIterator(Result<String,WebPage> res, String[] fields, String batchId) throws IOException {
+      this.res = res;
+      if (batchId != null) {
+        this.batchId = new Utf8(batchId);
+      }
+      if (fields != null) {
+        this.fields = new TreeSet<String>(Arrays.asList(fields));
+      }
+      advance();
+    }
+    
+    private void advance() throws IOException {
+      hasNext = res.next();
+      if (hasNext && batchId != null) {
+        do {
+          WebPage page = res.get();
+          Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
+          if (NutchJob.shouldProcess(mark, batchId)) {
+            return;
+          } else {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Skipping " + 
+                TableUtil.unreverseUrl(res.getKey()) + "; different batch id");
+            }
+            hasNext = res.next();
+          }
+        } while (hasNext);
+      }
+    }
+
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    public Map<String,Object> next() {
+      url = res.getKey();
+      page = (WebPage)res.get().clone();
+      try {
+        advance();
+        if (!hasNext) {
+          res.close();
+        }
+      } catch (IOException e) {
+        e.printStackTrace();
+        hasNext = false;
+        return null;
+      }
+      return pageAsMap(url, page);
+    }
+
+    private Map<String,Object> pageAsMap(String url, WebPage page) {
+      HashMap<String,Object> res = new HashMap<String,Object>();
+      if (fields == null || fields.contains("url")) {
+        res.put("url", TableUtil.unreverseUrl(url));
+      }
+      String[] pfields = page.getFields();
+      TreeSet<String> flds = null;
+      if (fields != null) {
+        flds = (TreeSet<String>)fields.clone();
+      } else {
+        flds = new TreeSet<String>(Arrays.asList(pfields));
+      }
+      flds.retainAll(Arrays.asList(pfields));
+      for (String f : flds) {
+        int idx = page.getFieldIndex(f);
+        if (idx < 0) {
+          continue;
+        }
+        Object val = page.get(idx);
+        if (val == null) {
+          continue;
+        }
+        if ("metadata".equals(f)) {
+          Map<Utf8, ByteBuffer> metadata = page.getMetadata();
+          Map<String,String> simpleMeta = new HashMap<String,String>();
+          if (metadata != null) {
+            Iterator<Entry<Utf8, ByteBuffer>> iterator = metadata.entrySet()
+                .iterator();
+            while (iterator.hasNext()) {
+              Entry<Utf8, ByteBuffer> entry = iterator.next();
+              simpleMeta.put(entry.getKey().toString(), 
+                  Bytes.toStringBinary(entry.getValue().array()));
+            }
+          }
+          res.put(f, simpleMeta);
+        } else if ("protocolStatus".equals(f)) {
+          ProtocolStatus ps = page.getProtocolStatus();
+          res.put(f, ProtocolStatusUtils.toString(ps));
+        } else if ("parseStatus".equals(f)) {
+          ParseStatus ps = page.getParseStatus();
+          res.put(f, ParseStatusUtils.toString(ps));
+        } else if ("signature".equals(f)) {
+          ByteBuffer bb = page.getSignature();
+          res.put(f, StringUtil.toHexString(bb.array()));
+        } else if ("content".equals(f)) {
+          ByteBuffer bb = page.getContent();
+          res.put(f, Bytes.toStringBinary(bb.array()));
+        } else if ("markers".equals(f)) {
+          res.put(f, convertMap(page.getMarkers()));
+        } else if ("inlinks".equals(f)) {
+          res.put(f, convertMap(page.getInlinks()));
+        } else if ("outlinks".equals(f)) {
+          res.put(f, convertMap(page.getOutlinks()));
+        } else {
+          if (val instanceof Utf8) {
+            val = val.toString();
+          } else if (val instanceof ByteBuffer) {
+            val = Bytes.toStringBinary(((ByteBuffer)val).array());
+          }
+          res.put(f, val);
+        }
+      }
+      return res;
+    }
+    
+    private Map<String,String> convertMap(Map map) {
+      Map<String,String> res = new HashMap<String,String>();
+      for (Object o : map.entrySet()) {
+        Entry e = (Entry)o;
+        res.put(e.getKey().toString(), e.getValue().toString());
+      }
+      return res;
+    }
+    
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+}

Propchange: nutch/trunk/src/java/org/apache/nutch/api/DbReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: nutch/trunk/src/java/org/apache/nutch/api/DbResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/DbResource.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/DbResource.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/api/DbResource.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,126 @@
+package org.apache.nutch.api;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.nutch.util.TableUtil;
+import org.restlet.data.Form;
+import org.restlet.data.MediaType;
+import org.restlet.ext.jackson.JacksonConverter;
+import org.restlet.representation.OutputRepresentation;
+import org.restlet.representation.Representation;
+import org.restlet.representation.Variant;
+import org.restlet.resource.ResourceException;
+import org.restlet.resource.ServerResource;
+
+public class DbResource extends ServerResource {
+  public static final String PATH = "db";
+  public static final String DESCR = "DB data streaming";
+
+  static JacksonConverter cnv = new JacksonConverter();
+  WeakHashMap<String,DbReader> readers = new WeakHashMap<String,DbReader>();
+  
+  @Override
+  protected void doInit() throws ResourceException {
+    super.doInit();
+    getVariants().add(new Variant(MediaType.APPLICATION_JSON));
+  }
+
+  @Override
+  protected Representation get(final Variant variant) throws ResourceException {
+    String startKey = null;
+    String endKey = null;
+    String rStartKey = null; // reversed keys
+    String rEndKey = null;
+    String[] fields = null;
+    String batchId = null;
+    String confId = ConfResource.DEFAULT_CONF;
+    Form form = getQuery();
+    if (form != null) {
+      startKey = form.getFirstValue("start");
+      endKey = form.getFirstValue("end");
+      rStartKey = form.getFirstValue("rstart");
+      rEndKey = form.getFirstValue("rend");
+      if (rStartKey != null || rEndKey != null) {
+        startKey = rStartKey;
+        endKey = rEndKey;
+      } else {
+        if (startKey != null) {
+          try {
+            startKey = TableUtil.reverseUrl(startKey);
+          } catch (MalformedURLException e) { /*ignore */};
+        }
+        if (endKey != null) {
+          try {
+            endKey = TableUtil.reverseUrl(endKey);
+          } catch (MalformedURLException e) { /*ignore */};
+        }
+      }
+      batchId = form.getFirstValue("batch");
+      String flds = form.getFirstValue("fields");
+      if (flds != null && flds.trim().length() > 0) {
+        flds = flds.replaceAll("\\s+", "");
+        fields = flds.split(",");
+      }
+    }
+    DbReader reader;
+    synchronized (readers) {
+      reader = readers.get(confId);
+      if (reader == null) {
+        reader = new DbReader(NutchApp.confMgr.get(confId), null);
+        readers.put(confId, reader);
+      }
+    }
+    Representation res = new DbRepresentation(this, variant, reader, fields,
+        startKey, endKey, batchId);
+    return res;
+  }
+  
+  private static class DbRepresentation extends OutputRepresentation {
+    private DbReader r;
+    private Variant variant;
+    private String[] fields;
+    private String startKey, endKey, batchId;
+    private DbResource resource;
+    
+    public DbRepresentation(DbResource resource, Variant variant, DbReader reader,
+        String[] fields, String startKey, String endKey, String batchId) {
+      super(variant.getMediaType());
+      this.resource = resource;
+      this.r = reader;
+      this.variant = variant;
+      this.fields = fields;
+      this.startKey = startKey;
+      this.endKey = endKey;
+      this.batchId = batchId;
+    }
+
+    @Override
+    public void write(OutputStream out) throws IOException {
+      try {
+        out.write('[');
+        Iterator<Map<String,Object>> it = r.iterator(fields, startKey, endKey, batchId);
+        boolean first = true;
+        while (it.hasNext()) {
+          if (!first) {
+            out.write(',');
+          } else {
+            first = false;
+          }
+          Map<String,Object> item = it.next();
+          Representation repr = cnv.toRepresentation(item, variant, resource);
+          repr.write(out);
+          out.flush();
+          repr.release();
+        }
+        out.write(']');
+      } catch (Exception e) {
+        throw new IOException("DbReader.iterator failed", e);
+      }
+    }
+  }
+}

Propchange: nutch/trunk/src/java/org/apache/nutch/api/DbResource.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: nutch/trunk/src/java/org/apache/nutch/api/JobManager.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobManager.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobManager.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobManager.java Thu Nov 25 12:05:57 2010
@@ -12,9 +12,10 @@ public interface JobManager {
 
   public List<JobStatus> list(String crawlId, State state) throws Exception;
   
-  public Map<String,String> get(String crawlId, String id) throws Exception;
+  public JobStatus get(String crawlId, String id) throws Exception;
   
-  public String create(String crawlId, JobType type, String confId, Object... args) throws Exception;
+  public String create(String crawlId, JobType type, String confId,
+      Map<String,Object> args) throws Exception;
   
   public boolean abort(String crawlId, String id) throws Exception;
   

Modified: nutch/trunk/src/java/org/apache/nutch/api/JobResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobResource.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobResource.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobResource.java Thu Nov 25 12:05:57 2010
@@ -1,9 +1,11 @@
 package org.apache.nutch.api;
 
+import java.util.List;
 import java.util.Map;
 
 import org.apache.nutch.api.JobManager.JobType;
 import org.apache.nutch.api.JobStatus.State;
+import org.restlet.data.Form;
 import org.restlet.resource.Get;
 import org.restlet.resource.Put;
 import org.restlet.resource.ServerResource;
@@ -14,18 +16,30 @@ public class JobResource extends ServerR
   
   @Get("json")
   public Object retrieve() throws Exception {
-    String cid = (String)getRequestAttributes().get(Params.CRAWL_ID);
-    String jid = (String)getRequestAttributes().get(Params.JOB_ID);
+    String cid = null;
+    String jid = null;
+    String cmd = null;
+    Form form = getQuery();
+    cid = (String)getRequestAttributes().get(Params.CRAWL_ID);
+    jid = (String)getRequestAttributes().get(Params.JOB_ID);
+    cmd = (String)getRequestAttributes().get(Params.CMD);
+    if (form != null) {
+      String v = form.getFirstValue(Params.CRAWL_ID);
+      if (v != null) cid = v;
+      v = form.getFirstValue(Params.JOB_ID);
+      if (v != null) jid = v;
+      v = form.getFirstValue(Params.CMD);
+      if (v != null) cmd = v;
+    }
     if (jid == null) {
       return NutchApp.jobMgr.list(cid, State.ANY);
     } else {
       // handle stop / abort / get
-      String cmd = (String)getRequestAttributes().get(Params.CMD);      
       if (cmd == null) {
         return NutchApp.jobMgr.get(cid, jid);
       }
       if (cmd.equals(Params.JOB_CMD_STOP)) {
-        return NutchApp.jobMgr.abort(cid, jid);
+        return NutchApp.jobMgr.stop(cid, jid);
       } else if (cmd.equals(Params.JOB_CMD_ABORT)) {
         return NutchApp.jobMgr.abort(cid, jid);
       } else if (cmd.equals(Params.JOB_CMD_GET)) {
@@ -46,9 +60,9 @@ public class JobResource extends ServerR
   public Object create(Map<String,Object> args) throws Exception {
     String cid = (String)args.get(Params.CRAWL_ID);
     String typeString = (String)args.get(Params.JOB_TYPE);
-    JobType type = JobType.valueOf(typeString);
+    JobType type = JobType.valueOf(typeString.toUpperCase());
     String confId = (String)args.get(Params.CONF_ID);
-    Object[] cmdArgs = (Object[])args.get(Params.ARGS);
+    Map<String,Object> cmdArgs = (Map<String,Object>)args.get(Params.ARGS);
     String jobId = NutchApp.jobMgr.create(cid, type, confId, cmdArgs);
     return jobId;
   }

Modified: nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/JobStatus.java Thu Nov 25 12:05:57 2010
@@ -3,18 +3,21 @@ package org.apache.nutch.api;
 import java.util.Map;
 
 import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.util.NutchTool;
 
 public class JobStatus {
-  public static enum State {IDLE, RUNNING, FINISHED, FAILED, KILLED, ANY};
+  public static enum State {IDLE, RUNNING, FINISHED, FAILED, KILLED,
+    STOPPING, KILLING, ANY};
   public String id;
   public JobType type;
   public String confId;
-  public Object[] args;
+  public Map<String,Object> args;
   public Map<String,Object> result;
+  public NutchTool tool;
   public State state;
   public String msg;
   
-  public JobStatus(String id, JobType type, String confId, Object[] args,
+  public JobStatus(String id, JobType type, String confId, Map<String,Object> args,
       State state, String msg) {
     this.id = id;
     this.type = type;

Modified: nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/NutchApp.java Thu Nov 25 12:05:57 2010
@@ -36,6 +36,8 @@ public class NutchApp extends Applicatio
           "}", ConfResource.class);
       router.attach("/" + ConfResource.PATH + "/{" + Params.CONF_ID +
           "}/{" + Params.PROP_NAME + "}", ConfResource.class);
+      // db
+      router.attach("/" + DbResource.PATH, DbResource.class);
       // jobs
       router.attach("/" + JobResource.PATH, JobResource.class);
       router.attach("/" + JobResource.PATH + "/{" + Params.JOB_ID + "}",

Modified: nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/api/impl/RAMJobManager.java Thu Nov 25 12:05:57 2010
@@ -15,10 +15,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.api.ConfResource;
 import org.apache.nutch.api.JobManager;
 import org.apache.nutch.api.JobStatus;
 import org.apache.nutch.api.JobStatus.State;
 import org.apache.nutch.api.NutchApp;
+import org.apache.nutch.crawl.Crawler;
 import org.apache.nutch.crawl.DbUpdaterJob;
 import org.apache.nutch.crawl.GeneratorJob;
 import org.apache.nutch.crawl.InjectorJob;
@@ -79,6 +81,7 @@ public class RAMJobManager implements Jo
     typeToClass.put(JobType.PARSE, ParserJob.class);
     typeToClass.put(JobType.UPDATEDB, DbUpdaterJob.class);
     typeToClass.put(JobType.READDB, WebTableReader.class);
+    typeToClass.put(JobType.CRAWL, Crawler.class);
   }
 
   private void addFinishedStatus(JobStatus status) {
@@ -109,13 +112,24 @@ public class RAMJobManager implements Jo
   }
 
   @Override
-  public Map<String, String> get(String crawlId, String jobId) throws Exception {
-    // TODO Auto-generated method stub
+  public JobStatus get(String crawlId, String jobId) throws Exception {
+    for (JobStatus job : jobRunning) {
+      if (job.id.equals(jobId)) {
+        return job;
+      }
+    }
+    for (JobStatus job : jobHistory) {
+      if (job.id.equals(jobId)) {
+        return job;
+      }
+    }
     return null;
   }
 
   @Override
-  public String create(String crawlId, JobType type, String confId, Object... args) throws Exception {
+  public String create(String crawlId, JobType type, String confId,
+      Map<String,Object> args) throws Exception {
+    if (args == null) args = Collections.emptyMap();
     JobWorker worker = new JobWorker(crawlId, type, confId, args);
     String id = worker.getId();
     exec.execute(worker);
@@ -125,12 +139,28 @@ public class RAMJobManager implements Jo
 
   @Override
   public boolean abort(String crawlId, String id) throws Exception {
+    // find running job
+    for (JobStatus job : jobRunning) {
+      if (job.id.equals(id)) {
+        job.state = State.KILLING;
+        boolean res = job.tool.killJob();
+        job.state = State.KILLED;
+        return res;
+      }
+    }
     return false;
   }
 
   @Override
   public boolean stop(String crawlId, String id) throws Exception {
-    // TODO Auto-generated method stub
+    // find running job
+    for (JobStatus job : jobRunning) {
+      if (job.id.equals(id)) {
+        job.state = State.STOPPING;
+        boolean res = job.tool.stopJob();
+        return res;
+      }
+    }
     return false;
   }
   
@@ -139,12 +169,13 @@ public class RAMJobManager implements Jo
     JobType type;
     String confId;
     NutchTool tool;
-    Object[] args;
-    float progress = 0f;
-    Job currentJob = null;
+    Map<String,Object> args;
     JobStatus jobStatus;
     
-    JobWorker(String crawlId, JobType type, String confId, Object... args) throws Exception {
+    JobWorker(String crawlId, JobType type, String confId, Map<String,Object> args) throws Exception {
+      if (confId == null) {
+        confId = ConfResource.DEFAULT_CONF;
+      }
       Configuration conf = NutchApp.confMgr.get(confId);
       // clone it - we are going to modify it
       if (conf == null) {
@@ -161,10 +192,11 @@ public class RAMJobManager implements Jo
       }
       Class<? extends NutchTool> clz = typeToClass.get(type);
       if (clz == null) {
-        clz = (Class<? extends NutchTool>)args[0];
+        clz = (Class<? extends NutchTool>)Class.forName((String)args.get(Nutch.ARG_CLASS));
       }
       tool = ReflectionUtils.newInstance(clz, conf);
       jobStatus = new JobStatus(id, type, confId, args, State.IDLE, "idle");
+      jobStatus.tool = tool;
     }
     
     public String getId() {
@@ -172,65 +204,31 @@ public class RAMJobManager implements Jo
     }
     
     public float getProgress() {
-      return progress;
+      return tool.getProgress();
     }
     
     public State getState() {
       return jobStatus.state;
     }
     
-    private final float[] noProgress = new float[2];
-    
-    public float[] getCurrentJobProgress() throws IOException {
-      if (currentJob == null) {
-        return noProgress;
-      }
-      float[] res = new float[2];
-      res[0] = currentJob.mapProgress();
-      res[1] = currentJob.reduceProgress();
-      return res;
-    }
-    
     public Map<String,Object> getResult() {
       return jobStatus.result;
     }
     
-    public String getStatus() {
-      return jobStatus.msg;
+    public Map<String,Object> getStatus() {
+      return tool.getStatus();
     }
 
     @Override
     public void run() {
       try {
-        progress = 0f;
         jobStatus.state = State.RUNNING;
-        jobStatus.msg = "prepare";
-        tool.prepare();
-        progress = 0.1f;
-        Job[] jobs = tool.createJobs(args);
-        float delta = 0.8f / jobs.length;
-        for (int i = 0; i < jobs.length; i++) {
-          currentJob = jobs[i];
-          jobStatus.msg = "job " + (i + 1) + "/" + jobs.length;
-          boolean success = jobs[i].waitForCompletion(true);
-          if (!success) {
-            throw new Exception("Job failed.");
-          }
-          jobStatus.msg = "postJob " + (i + 1);
-          tool.postJob(i, jobs[i]);
-          progress += delta;
-        }
-        currentJob = null;
-        progress = 0.9f;
-        jobStatus.msg = "finish";
-        Map<String,Object> res = tool.finish();
-        if (res != null) {
-          jobStatus.result = res;
-        }
-        progress = 1.0f;
+        jobStatus.msg = "OK";
+        jobStatus.result = tool.run(args);
         jobStatus.state = State.FINISHED;
       } catch (Exception e) {
-        jobStatus.msg = "ERROR " + jobStatus.msg + ": " + e.toString();
+        e.printStackTrace();
+        jobStatus.msg = "ERROR: " + e.toString();
         jobStatus.state = State.FAILED;
       }
     }

Added: nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,242 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.fetcher.FetcherJob;
+import org.apache.nutch.indexer.solr.SolrIndexerJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Crawler extends NutchTool implements Tool {
+  private static final Logger LOG = LoggerFactory.getLogger(Crawler.class);
+  
+  private boolean cleanSeedDir = false;
+  private String tmpSeedDir = null;
+  private HashMap<String,Object> results = new HashMap<String,Object>();
+  private Map<String,Object> status =
+    Collections.synchronizedMap(new HashMap<String,Object>());
+  private NutchTool currentTool = null;
+  private boolean shouldStop = false;
+  
+  @Override
+  public Map<String,Object> getStatus() {
+    return status;
+  }
+  
+  private Map<String,Object> runTool(Class<? extends NutchTool> toolClass,
+      Map<String,Object> args) throws Exception {
+    currentTool = (NutchTool)ReflectionUtils.newInstance(toolClass, getConf());
+    return currentTool.run(args);
+  }
+  
+  @Override
+  public boolean stopJob() throws Exception {
+    shouldStop = true;
+    if (currentTool != null) {
+      return currentTool.stopJob();
+    }
+    return false;
+  }
+
+  @Override
+  public boolean killJob() throws Exception {
+    shouldStop = true;
+    if (currentTool != null) {
+      return currentTool.killJob();
+    }
+    return false;
+  }
+
+  @Override
+  public Map<String,Object> run(Map<String, Object> args) throws Exception {
+    results.clear();
+    status.clear();
+    String crawlId = (String)args.get(Nutch.ARG_CRAWL);
+    if (crawlId != null) {
+      getConf().set(Nutch.CRAWL_ID_KEY, crawlId);
+    }
+    String seedDir = null;
+    String seedList = (String)args.get(Nutch.ARG_SEEDLIST);    
+    if (seedList != null) { // takes precedence
+      String[] seeds = seedList.split("\\s+");
+      // create tmp. dir
+      String tmpSeedDir = getConf().get("hadoop.tmp.dir") + "/seed-" +
+        System.currentTimeMillis();
+      FileSystem fs = FileSystem.get(getConf());
+      Path p = new Path(tmpSeedDir);
+      fs.mkdirs(p);
+      Path seedOut = new Path(p, "urls");
+      OutputStream os = fs.create(seedOut);
+      for (String s : seeds) {
+        os.write(s.getBytes());
+        os.write('\n');
+      }
+      os.flush();
+      os.close();
+      cleanSeedDir = true;
+      seedDir = tmpSeedDir;
+    } else {
+      seedDir = (String)args.get(Nutch.ARG_SEEDDIR);
+    }
+    Integer depth = (Integer)args.get(Nutch.ARG_DEPTH);
+    if (depth == null) depth = 1;
+    Boolean parse = (Boolean)args.get(Nutch.ARG_PARSE);
+    if (parse == null) {
+      parse = getConf().getBoolean(FetcherJob.PARSE_KEY, false);
+    }
+    String solrUrl = (String)args.get(Nutch.ARG_SOLR);
+    int onePhase = 3;
+    if (!parse) onePhase++;
+    float totalPhases = depth * onePhase;
+    if (seedDir != null) totalPhases++;
+    float phase = 0;
+    Map<String,Object> jobRes = null;
+    LinkedHashMap<String,Object> subTools = new LinkedHashMap<String,Object>();
+    status.put(Nutch.STAT_JOBS, subTools);
+    results.put(Nutch.STAT_JOBS, subTools);
+    // inject phase
+    if (seedDir != null) {
+      status.put(Nutch.STAT_PHASE, "inject");
+      jobRes = runTool(InjectorJob.class, args);
+      if (jobRes != null) {
+        subTools.put("inject", jobRes);
+      }
+      status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+      if (cleanSeedDir && tmpSeedDir != null) {
+        LOG.info(" - cleaning tmp seed list in " + tmpSeedDir);
+        FileSystem.get(getConf()).delete(new Path(tmpSeedDir), true);
+      }
+    }
+    if (shouldStop) {
+      return results;
+    }
+    // run "depth" cycles
+    for (int i = 0; i < depth; i++) {
+      status.put(Nutch.STAT_PHASE, "generate " + i);
+      jobRes = runTool(GeneratorJob.class, args);
+      if (jobRes != null) {
+        subTools.put("generate " + i, jobRes);
+      }
+      status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+      if (shouldStop) {
+        return results;
+      }
+      status.put(Nutch.STAT_PHASE, "fetch " + i);
+      jobRes = runTool(FetcherJob.class, args);
+      if (jobRes != null) {
+        subTools.put("fetch " + i, jobRes);
+      }
+      status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+      if (shouldStop) {
+        return results;
+      }
+      if (!parse) {
+        status.put(Nutch.STAT_PHASE, "parse " + i);
+        jobRes = runTool(ParserJob.class, args);
+        if (jobRes != null) {
+          subTools.put("parse " + i, jobRes);
+        }
+        status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+        if (shouldStop) {
+          return results;
+        }
+      }
+      status.put(Nutch.STAT_PHASE, "updatedb " + i);
+      jobRes = runTool(DbUpdaterJob.class, args);
+      if (jobRes != null) {
+        subTools.put("updatedb " + i, jobRes);
+      }
+      status.put(Nutch.STAT_PROGRESS, ++phase / totalPhases);
+      if (shouldStop) {
+        return results;
+      }
+    }
+    if (solrUrl != null) {
+      status.put(Nutch.STAT_PHASE, "index");
+      jobRes = runTool(SolrIndexerJob.class, args);
+      if (jobRes != null) {
+        subTools.put("index", jobRes);
+      }
+    }
+    return results;
+  }
+
+  @Override
+  public float getProgress() {
+    Float p = (Float)status.get(Nutch.STAT_PROGRESS);
+    if (p == null) return 0;
+    return p;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length == 0) {
+      System.out.println("Usage: Crawl (<seedDir> | -continue) [-solr <solrURL>] [-threads n] [-depth i] [-topN N]");
+      return -1;
+    }
+    // parse most common arguments here
+    String seedDir = null;
+    int threads = getConf().getInt("fetcher.threads.fetch", 10);
+    int depth = 5;
+    long topN = Long.MAX_VALUE;
+    String solrUrl = null;
+    
+    for (int i = 0; i < args.length; i++) {
+      if ("-threads".equals(args[i])) {
+        threads = Integer.parseInt(args[i+1]);
+        i++;
+      } else if ("-depth".equals(args[i])) {
+        depth = Integer.parseInt(args[i+1]);
+        i++;
+      } else if ("-topN".equals(args[i])) {
+          topN = Integer.parseInt(args[i+1]);
+          i++;
+      } else if ("-solr".equals(args[i])) {
+        solrUrl = StringUtils.lowerCase(args[i + 1]);
+        i++;
+      } else if ("-continue".equals(args[i])) {
+        // skip
+      } else if (args[i] != null) {
+        seedDir = args[i];
+      }
+    }
+    Map<String,Object> argMap = ToolUtil.toArgMap(
+        Nutch.ARG_THREADS, threads,
+        Nutch.ARG_DEPTH, depth,
+        Nutch.ARG_TOPN, topN,
+        Nutch.ARG_SOLR, solrUrl,
+        Nutch.ARG_SEEDDIR, seedDir);
+    run(argMap);
+    return 0;
+  }
+  
+  public static void main(String[] args) throws Exception {
+    Crawler c = new Crawler();
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, c, args);
+    System.exit(res);
+  }
+}

Propchange: nutch/trunk/src/java/org/apache/nutch/crawl/Crawler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/DbUpdaterJob.java Thu Nov 25 12:05:57 2010
@@ -1,6 +1,9 @@
 package org.apache.nutch.crawl;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 
@@ -19,10 +22,10 @@ import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
 import org.apache.gora.mapreduce.StringComparator;
 
-public class DbUpdaterJob extends Configured
-implements Tool, NutchTool {
+public class DbUpdaterJob extends NutchTool implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(DbUpdaterJob.class);
 
@@ -53,49 +56,33 @@ implements Tool, NutchTool {
   public DbUpdaterJob(Configuration conf) {
     setConf(conf);
   }
-  
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
-  }
-
-  public Map<String,Object> finish() throws Exception {
-    return null;
-  }
-  
-  public Job[] createJobs(Object... args) throws Exception {
-    String crawlId = null;
-    if (args.length > 0) {
-      crawlId = (String)args[0];
-    }
-    Job job = new NutchJob(getConf(), "update-table");
+    
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
+    String crawlId = (String)args.get(Nutch.ARG_CRAWL);
+    numJobs = 1;
+    currentJobNum = 0;
+    currentJob = new NutchJob(getConf(), "update-table");
     if (crawlId != null) {
-      job.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
+      currentJob.getConfiguration().set(Nutch.CRAWL_ID_KEY, crawlId);
     }
     //job.setBoolean(ALL, updateAll);
     ScoringFilters scoringFilters = new ScoringFilters(getConf());
     HashSet<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
     fields.addAll(scoringFilters.getFields());
     // TODO: Figure out why this needs to be here
-    job.getConfiguration().setClass("mapred.output.key.comparator.class",
+    currentJob.getConfiguration().setClass("mapred.output.key.comparator.class",
         StringComparator.class, RawComparator.class);
-    StorageUtils.initMapperJob(job, fields, String.class,
+    StorageUtils.initMapperJob(currentJob, fields, String.class,
         NutchWritable.class, DbUpdateMapper.class);
-    StorageUtils.initReducerJob(job, DbUpdateReducer.class);
-    return new Job[]{job};
+    StorageUtils.initReducerJob(currentJob, DbUpdateReducer.class);
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    return results;
   }
   
-  private int updateTable(String batchId) throws Exception {
+  private int updateTable(String crawlId) throws Exception {
     LOG.info("DbUpdaterJob: starting");
-    Job[] jobs = createJobs(new Object[]{batchId});
-    boolean success = jobs[0].waitForCompletion(true);
-    if (!success){
-    	LOG.info("DbUpdaterJob: failed");
-    	return -1;
-    }
+    run(ToolUtil.toArgMap(Nutch.ARG_CRAWL, crawlId));
     LOG.info("DbUpdaterJob: done");
     return 0;
   }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/GeneratorJob.java Thu Nov 25 12:05:57 2010
@@ -3,6 +3,7 @@ package org.apache.nutch.crawl;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -26,8 +27,9 @@ import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
 
-public class GeneratorJob extends Configured implements Tool, NutchTool {
+public class GeneratorJob extends NutchTool implements Tool {
   public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
   public static final String GENERATOR_MIN_SCORE = "generate.min.score";
   public static final String GENERATOR_FILTER = "generate.filter";
@@ -124,36 +126,28 @@ public class GeneratorJob extends Config
     setConf(conf);
   }
 
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> finish() throws Exception {
-    HashMap<String,Object> res = new HashMap<String,Object>();
-    res.put(BATCH_ID, batchId);
-    return res;
-  }
-
-  public Job[] createJobs(Object... args) throws Exception {
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
     // map to inverted subset due for fetch, sort by score
-    long topN = (Long)args[0];
-    long curTime = (Long)args[1];
-    boolean filter = (Boolean)args[2];
-    boolean norm = (Boolean)args[3];
+    Long topN = (Long)args.get(Nutch.ARG_TOPN);
+    Long curTime = (Long)args.get(Nutch.ARG_CURTIME);
+    if (curTime == null) {
+      curTime = System.currentTimeMillis();
+    }
+    Boolean filter = (Boolean)args.get(Nutch.ARG_FILTER);
+    Boolean norm = (Boolean)args.get(Nutch.ARG_NORMALIZE);
     // map to inverted subset due for fetch, sort by score
     getConf().setLong(GENERATOR_CUR_TIME, curTime);
-    getConf().setLong(GENERATOR_TOP_N, topN);
-    getConf().setBoolean(GENERATOR_FILTER, filter);
+    if (topN != null)
+      getConf().setLong(GENERATOR_TOP_N, topN);
+    if (filter != null)
+      getConf().setBoolean(GENERATOR_FILTER, filter);
     int randomSeed = Math.abs(new Random().nextInt());
-    String batchId = (curTime / 1000) + "-" + randomSeed;
+    batchId = (curTime / 1000) + "-" + randomSeed;
     getConf().setInt(GENERATOR_RANDOM_SEED, randomSeed);
     getConf().set(BATCH_ID, batchId);
     getConf().setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis());
-    getConf().setBoolean(GENERATOR_NORMALISE, norm);
+    if (norm != null)
+      getConf().setBoolean(GENERATOR_NORMALISE, norm);
     String mode = getConf().get(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
     if (GENERATOR_COUNT_VALUE_HOST.equalsIgnoreCase(mode)) {
       getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
@@ -164,12 +158,16 @@ public class GeneratorJob extends Config
       getConf().set(GENERATOR_COUNT_MODE, GENERATOR_COUNT_VALUE_HOST);
       getConf().set(URLPartitioner.PARTITION_MODE_KEY, URLPartitioner.PARTITION_MODE_HOST);
     }
-
-    Job job = new NutchJob(getConf(), "generate: " + batchId);
-    StorageUtils.initMapperJob(job, FIELDS, SelectorEntry.class,
+    numJobs = 1;
+    currentJobNum = 0;
+    currentJob = new NutchJob(getConf(), "generate: " + batchId);
+    StorageUtils.initMapperJob(currentJob, FIELDS, SelectorEntry.class,
         WebPage.class, GeneratorMapper.class, URLPartitioner.class, true);
-    StorageUtils.initReducerJob(job, GeneratorReducer.class);
-    return new Job[]{job};
+    StorageUtils.initReducerJob(currentJob, GeneratorReducer.class);
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    results.put(BATCH_ID, batchId);
+    return results;
   }
   
   private String batchId;
@@ -188,12 +186,12 @@ public class GeneratorJob extends Config
     if (topN != Long.MAX_VALUE) {
       LOG.info("GeneratorJob: topN: " + topN);
     }
-    Job[] jobs = createJobs(topN, curTime, filter, norm);
-    boolean success = jobs[0].waitForCompletion(true);
-    if (!success) return null;
-    
+    run(ToolUtil.toArgMap(
+        Nutch.ARG_TOPN, topN,
+        Nutch.ARG_CURTIME, curTime,
+        Nutch.ARG_FILTER, filter,
+        Nutch.ARG_NORMALIZE, norm));
     batchId =  getConf().get(BATCH_ID);
-
     LOG.info("GeneratorJob: done");
     LOG.info("GeneratorJob: generated batch id: " + batchId);
     return batchId;
@@ -227,4 +225,5 @@ public class GeneratorJob extends Config
     int res = ToolRunner.run(NutchConfiguration.create(), new GeneratorJob(), args);
     System.exit(res);
   }
+
 }

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/InjectorJob.java Thu Nov 25 12:05:57 2010
@@ -18,7 +18,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -37,6 +36,7 @@ import org.apache.nutch.util.NutchConfig
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
 
 /** This class takes a flat file of URLs and adds them to the of pages to be
  * crawled.  Useful for bootstrapping the system.
@@ -47,15 +47,10 @@ import org.apache.nutch.util.TableUtil;
  * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
  * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
  **/
-public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
-    implements Tool, NutchTool {
+public class InjectorJob extends NutchTool implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(InjectorJob.class);
 
-  private Configuration conf;
-
-  private FetchSchedule schedule;
-
   private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
 
   private static final Utf8 YES_STRING = new Utf8("y");
@@ -173,92 +168,88 @@ public class InjectorJob extends GoraMap
       context.write(reversedUrl, row);
     }
   }
+  
+  public static class InjectorMapper 
+      extends GoraMapper<String, WebPage, String, WebPage> {
+    private FetchSchedule schedule;
 
-  public InjectorJob() {
-
-  }
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      schedule = FetchScheduleFactory.getFetchSchedule(conf);
+      // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+    }
 
-  public InjectorJob(Configuration conf) {
-    setConf(conf);
-  }
+    @Override
+    protected void map(String key, WebPage row, Context context)
+        throws IOException, InterruptedException {
+      if (Mark.INJECT_MARK.checkMark(row) == null) {
+        return;
+      }
+      Mark.INJECT_MARK.removeMark(row);
+      if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
+        row.setStatus(CrawlStatus.STATUS_UNFETCHED);
+        schedule.initializeSchedule(key, row);
+        // row.setScore(scoreInjected);
+      }
 
-  @Override
-  public Configuration getConf() {
-    return conf;
+      context.write(key, row);
+    }
+        
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
+  public InjectorJob() {
 
-  @Override
-  public void setup(Context context) throws IOException {
-    Configuration conf = context.getConfiguration();
-    schedule = FetchScheduleFactory.getFetchSchedule(conf);
-    // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
   }
 
-  @Override
-  protected void map(String key, WebPage row, Context context)
-      throws IOException, InterruptedException {
-    if (Mark.INJECT_MARK.checkMark(row) == null) {
-      return;
-    }
-    Mark.INJECT_MARK.removeMark(row);
-    if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
-      row.setStatus(CrawlStatus.STATUS_UNFETCHED);
-      schedule.initializeSchedule(key, row);
-      // row.setScore(scoreInjected);
-    }
-
-    context.write(key, row);
-  }
-  
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
+  public InjectorJob(Configuration conf) {
+    setConf(conf);
   }
 
-  public Map<String,Object> finish() throws Exception {
-    return null;
-  }
-  
-  public Job[] createJobs(Object... args) throws Exception {
-    Job[] jobs = new Job[2];
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
     getConf().setLong("injector.current.time", System.currentTimeMillis());
-    Job job = new NutchJob(getConf(), "inject-p1 " + args[0]);
-    FileInputFormat.addInputPath(job, (Path)args[0]);
-    job.setMapperClass(UrlMapper.class);
-    job.setMapOutputKeyClass(String.class);
-    job.setMapOutputValueClass(WebPage.class);
-    job.setOutputFormatClass(GoraOutputFormat.class);
-    DataStore<String, WebPage> store = StorageUtils.createWebStore(job.getConfiguration(),
+    Path input;
+    Object path = args.get(Nutch.ARG_SEEDDIR);
+    if (path instanceof Path) {
+      input = (Path)path;
+    } else {
+      input = new Path(path.toString());
+    }
+    numJobs = 2;
+    currentJobNum = 0;
+    status.put(Nutch.STAT_PHASE, "convert input");
+    currentJob = new NutchJob(getConf(), "inject-p1 " + input);
+    FileInputFormat.addInputPath(currentJob, input);
+    currentJob.setMapperClass(UrlMapper.class);
+    currentJob.setMapOutputKeyClass(String.class);
+    currentJob.setMapOutputValueClass(WebPage.class);
+    currentJob.setOutputFormatClass(GoraOutputFormat.class);
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
         String.class, WebPage.class);
-    GoraOutputFormat.setOutput(job, store, true);
-    job.setReducerClass(Reducer.class);
-    job.setNumReduceTasks(0);
-    job.waitForCompletion(true);
-
-    job = new NutchJob(getConf(), "inject-p2 " + args[0]);
-    StorageUtils.initMapperJob(job, FIELDS, String.class,
-        WebPage.class, InjectorJob.class);
-    job.setNumReduceTasks(0);
-    jobs[1] = job;
-    return jobs;
+    GoraOutputFormat.setOutput(currentJob, store, true);
+    currentJob.setReducerClass(Reducer.class);
+    currentJob.setNumReduceTasks(0);
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    currentJob = null;
+
+    status.put(Nutch.STAT_PHASE, "merge input with db");
+    status.put(Nutch.STAT_PROGRESS, 0.5f);
+    currentJobNum = 1;
+    currentJob = new NutchJob(getConf(), "inject-p2 " + input);
+    StorageUtils.initMapperJob(currentJob, FIELDS, String.class,
+        WebPage.class, InjectorMapper.class);
+    currentJob.setNumReduceTasks(0);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    status.put(Nutch.STAT_PROGRESS, 1.0f);
+    return results;
   }
 
   public void inject(Path urlDir) throws Exception {
     LOG.info("InjectorJob: starting");
     LOG.info("InjectorJob: urlDir: " + urlDir);
 
-    Job[] jobs = createJobs(urlDir);
-    jobs[0].waitForCompletion(true);
-
-    jobs[1].waitForCompletion(true);
+    run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
   }
 
   @Override

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/WebTableReader.java Thu Nov 25 12:05:57 2010
@@ -13,7 +13,6 @@ import org.apache.avro.util.Utf8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -30,8 +29,6 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.parse.ParseStatusUtils;
 import org.apache.nutch.protocol.ProtocolStatusUtils;
-import org.apache.nutch.storage.ParseStatus;
-import org.apache.nutch.storage.ProtocolStatus;
 import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.Bytes;
@@ -40,6 +37,7 @@ import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.StringUtil;
 import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
 import org.apache.gora.mapreduce.GoraMapper;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
@@ -49,7 +47,7 @@ import org.apache.gora.store.DataStore;
  * Displays information about the entries of the webtable
  **/
 
-public class WebTableReader extends Configured implements Tool, NutchTool {
+public class WebTableReader extends NutchTool implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(WebTableReader.class);
 
@@ -198,13 +196,13 @@ public class WebTableReader extends Conf
 
   public void processStatJob(boolean sort) throws Exception {
 
-    Job[] jobs = createJobs(sort);
-    
     if (LOG.isInfoEnabled()) {
       LOG.info("WebTable statistics start");
     }
-    boolean success = jobs[0].waitForCompletion(true);
-    postJob(0, jobs[0]);
+    run(ToolUtil.toArgMap(Nutch.ARG_SORT, sort));
+    for (Entry<String,Object> e : results.entrySet()) {
+      LOG.info(e.getKey() + ":\t" + e.getValue());
+    }
   }
 
   /** Prints out the entry to the standard out **/
@@ -479,56 +477,46 @@ public class WebTableReader extends Conf
     }
   }
 
-  @Override
-  public Map<String, Object> prepare() throws Exception {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
   // for now handles only -stat
   @Override
-  public Job[] createJobs(Object... args) throws Exception {
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
     Path tmpFolder = new Path(getConf().get("mapred.temp.dir", ".")
         + "stat_tmp" + System.currentTimeMillis());
 
-    Job job = new NutchJob(getConf(), "db_stats");
+    numJobs = 1;
+    currentJob = new NutchJob(getConf(), "db_stats");
 
-    boolean sort = false;
-    if (args != null && args.length > 0) {
-      sort = (Boolean)args[0];
-    }
-    job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
+    Boolean sort = (Boolean)args.get(Nutch.ARG_SORT);
+    if (sort == null) sort = Boolean.FALSE;
+    currentJob.getConfiguration().setBoolean("db.reader.stats.sort", sort);
 
-    DataStore<String, WebPage> store = StorageUtils.createWebStore(job
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob
         .getConfiguration(), String.class, WebPage.class);
     Query<String, WebPage> query = store.newQuery();
     query.setFields(WebPage._ALL_FIELDS);
 
-    GoraMapper.initMapperJob(job, query, store, Text.class, LongWritable.class,
+    GoraMapper.initMapperJob(currentJob, query, store, Text.class, LongWritable.class,
         WebTableStatMapper.class, null, true);
 
-    job.setCombinerClass(WebTableStatCombiner.class);
-    job.setReducerClass(WebTableStatReducer.class);
+    currentJob.setCombinerClass(WebTableStatCombiner.class);
+    currentJob.setReducerClass(WebTableStatReducer.class);
 
-    FileOutputFormat.setOutputPath(job, tmpFolder);
+    FileOutputFormat.setOutputPath(currentJob, tmpFolder);
 
-    job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(LongWritable.class);
-
-    return new Job[]{job};
-  }
-
-  @Override
-  public Map<String, Object> postJob(int jobIndex, Job job) throws Exception {
-    Path tmpFolder = FileOutputFormat.getOutputPath(job);
+    currentJob.setOutputFormatClass(SequenceFileOutputFormat.class);
 
+    currentJob.setOutputKeyClass(Text.class);
+    currentJob.setOutputValueClass(LongWritable.class);
     FileSystem fileSystem = FileSystem.get(getConf());
 
-    if (!job.isSuccessful()) {
-      fileSystem.delete(tmpFolder, true);
-      return null;
+    try {
+      currentJob.waitForCompletion(true);
+    } finally {
+      ToolUtil.recordJobStatus(null, currentJob, results);
+      if (!currentJob.isSuccessful()) {
+        fileSystem.delete(tmpFolder, true);
+        return results;
+      }
     }
 
     Text key = new Text();
@@ -564,46 +552,40 @@ public class WebTableReader extends Conf
       reader.close();
     }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Statistics for WebTable: ");
-      LongWritable totalCnt = stats.get("T");
-      if (totalCnt==null)totalCnt=new LongWritable(0);
-      stats.remove("T");
-      LOG.info("TOTAL urls:\t" + totalCnt.get());
-      for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
-        String k = entry.getKey();
-        LongWritable val = entry.getValue();
-        if (k.equals("scn")) {
-          LOG.info("min score:\t" + (float) (val.get() / 1000.0f));
-        } else if (k.equals("scx")) {
-          LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
-        } else if (k.equals("sct")) {
-          LOG.info("avg score:\t"
-              + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
-        } else if (k.startsWith("status")) {
-          String[] st = k.split(" ");
-          int code = Integer.parseInt(st[1]);
-          if (st.length > 2)
-            LOG.info("   " + st[2] + " :\t" + val);
-          else
-            LOG.info(st[0] + " " + code + " ("
-                + CrawlStatus.getName((byte) code) + "):\t" + val);
-        } else
-          LOG.info(k + ":\t" + val);
-      }
+    LongWritable totalCnt = stats.get("T");
+    if (totalCnt==null)totalCnt=new LongWritable(0);
+    stats.remove("T");
+    results.put("TOTAL urls", totalCnt.get());
+    for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+      String k = entry.getKey();
+      LongWritable val = entry.getValue();
+      if (k.equals("scn")) {
+        results.put("min score", (float) (val.get() / 1000.0f));
+      } else if (k.equals("scx")) {
+        results.put("max score", (float) (val.get() / 1000.0f));
+      } else if (k.equals("sct")) {
+        results.put("avg score",
+            (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
+      } else if (k.startsWith("status")) {
+        String[] st = k.split(" ");
+        int code = Integer.parseInt(st[1]);
+        if (st.length > 2)
+          results.put(st[2], val.get());
+        else
+          results.put(st[0] + " " + code + " ("
+              + CrawlStatus.getName((byte) code) + ")", val.get());
+      } else
+        results.put(k, val.get());
     }
     // removing the tmp folder
     fileSystem.delete(tmpFolder, true);
     if (LOG.isInfoEnabled()) {
+      LOG.info("Statistics for WebTable: ");
+      for (Entry<String,Object> e : results.entrySet()) {
+        LOG.info(e.getKey() + ":\t" + e.getValue());
+      }
       LOG.info("WebTable statistics: done");
     }
-    return null;
-  }
-
-  @Override
-  public Map<String, Object> finish() throws Exception {
-    // TODO Auto-generated method stub
-    return null;
+    return results;
   }
-
 }

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherJob.java Thu Nov 25 12:05:57 2010
@@ -3,6 +3,8 @@ package org.apache.nutch.fetcher;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Random;
@@ -27,13 +29,14 @@ import org.apache.nutch.util.NutchConfig
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
 import org.apache.gora.mapreduce.GoraMapper;
 
 /**
  * Multi-threaded fetcher.
  *
  */
-public class FetcherJob implements Tool, NutchTool {
+public class FetcherJob extends NutchTool implements Tool {
 
   public static final String PROTOCOL_REDIR = "protocol";
 
@@ -108,8 +111,6 @@ public class FetcherJob implements Tool,
 
   public static final Logger LOG = LoggerFactory.getLogger(FetcherJob.class);
 
-  private Configuration conf;
-
   public FetcherJob() {
 
   }
@@ -118,16 +119,6 @@ public class FetcherJob implements Tool,
     setConf(conf);
   }
 
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
   public Collection<WebPage.Field> getFields(Job job) {
     Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
     if (job.getConfiguration().getBoolean(PARSE_KEY, true)) {
@@ -140,32 +131,28 @@ public class FetcherJob implements Tool,
     return fields;
   }
 
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
-  }
- 
-  public Map<String,Object> finish() throws Exception {
-    return null;
-  }
-  
-  public Job[] createJobs(Object... args) throws Exception {
+  @Override
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
     checkConfiguration();
-    String batchId = (String)args[0];
-    int threads = (Integer)args[1];
-    boolean shouldResume = (Boolean)args[2];
-    boolean parse = (Boolean)args[3];
-    int numTasks = (Integer)args[4];
+    String batchId = (String)args.get(Nutch.ARG_BATCH);
+    Integer threads = (Integer)args.get(Nutch.ARG_THREADS);
+    Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
+    Boolean parse = (Boolean)args.get(Nutch.ARG_PARSE);
+    Integer numTasks = (Integer)args.get(Nutch.ARG_NUMTASKS);
  
-    if (threads > 0) {
+    if (threads != null && threads > 0) {
       getConf().setInt(THREADS_KEY, threads);
     }
+    if (batchId == null) {
+      batchId = Nutch.ALL_BATCH_ID_STR;
+    }
     getConf().set(GeneratorJob.BATCH_ID, batchId);
-    getConf().setBoolean(PARSE_KEY, parse);
-    getConf().setBoolean(RESUME_KEY, shouldResume);
+    if (parse != null) {
+      getConf().setBoolean(PARSE_KEY, parse);
+    }
+    if (shouldResume != null) {
+      getConf().setBoolean(RESUME_KEY, shouldResume);
+    }
 
     // set the actual time for the timelimit relative
     // to the beginning of the whole job and not of a specific task
@@ -175,19 +162,20 @@ public class FetcherJob implements Tool,
       timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
       getConf().setLong("fetcher.timelimit", timelimit);
     }
-
-    Job job = new NutchJob(getConf(), "fetch");
-    Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, IntWritable.class,
+    numJobs = 1;
+    currentJob = new NutchJob(getConf(), "fetch");
+    Collection<WebPage.Field> fields = getFields(currentJob);
+    StorageUtils.initMapperJob(currentJob, fields, IntWritable.class,
         FetchEntry.class, FetcherMapper.class, PartitionUrlByHost.class, false);
-    StorageUtils.initReducerJob(job, FetcherReducer.class);
-    if (numTasks < 1) {
-      job.setNumReduceTasks(job.getConfiguration().getInt("mapred.map.tasks",
-          job.getNumReduceTasks()));
+    StorageUtils.initReducerJob(currentJob, FetcherReducer.class);
+    if (numTasks == null || numTasks < 1) {
+      currentJob.setNumReduceTasks(currentJob.getConfiguration().getInt("mapred.map.tasks",
+          currentJob.getNumReduceTasks()));
     } else {
-      job.setNumReduceTasks(numTasks);
+      currentJob.setNumReduceTasks(numTasks);
     }
-    return new Job[]{job};
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    return results;
   }
 
   /**
@@ -216,13 +204,12 @@ public class FetcherJob implements Tool,
       LOG.info("FetcherJob: batchId: " + batchId);
     }
 
-    Job[] jobs = createJobs(batchId, threads, shouldResume, parse, numTasks);
-    boolean success = jobs[0].waitForCompletion(true);
-    if (!success) {
-        LOG.info("FetcherJob: failed");
-    	return -1;
-    }
-
+    run(ToolUtil.toArgMap(
+        Nutch.ARG_BATCH, batchId,
+        Nutch.ARG_THREADS, threads,
+        Nutch.ARG_RESUME, shouldResume,
+        Nutch.ARG_PARSE, parse,
+        Nutch.ARG_NUMTASKS, numTasks));
     LOG.info("FetcherJob: done");
     return 0;
   }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerJob.java Thu Nov 25 12:05:57 2010
@@ -21,13 +21,12 @@ import org.apache.nutch.storage.ParseSta
 import org.apache.nutch.storage.StorageUtils;
 import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TableUtil;
 import org.apache.gora.mapreduce.GoraMapper;
 import org.apache.gora.mapreduce.StringComparator;
 
-public abstract class IndexerJob
-extends GoraMapper<String, WebPage, String, WebPage>
-implements Tool {
+public abstract class IndexerJob extends NutchTool implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(IndexerJob.class);
 
@@ -35,55 +34,47 @@ implements Tool {
 
   private static final Utf8 REINDEX = new Utf8("-reindex");
 
-  private Configuration conf;
-
-  protected Utf8 batchId;
-
   static {
     FIELDS.add(WebPage.Field.SIGNATURE);
     FIELDS.add(WebPage.Field.PARSE_STATUS);
     FIELDS.add(WebPage.Field.SCORE);
     FIELDS.add(WebPage.Field.MARKERS);
   }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public void setup(Context context) throws IOException {
-    Configuration conf = context.getConfiguration();
-    batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
-  }
-
-  @Override
-  public void map(String key, WebPage page, Context context)
-  throws IOException, InterruptedException {
-    ParseStatus pstatus = page.getParseStatus();
-    if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)
-        || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {
-      return; // filter urls not parsed
+  
+  public static class IndexerMapper
+      extends GoraMapper<String, WebPage, String, WebPage> {
+    protected Utf8 batchId;
+
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
     }
 
-    Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
-    if (!batchId.equals(REINDEX)) {
-      if (!NutchJob.shouldProcess(mark, batchId)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
+    @Override
+    public void map(String key, WebPage page, Context context)
+    throws IOException, InterruptedException {
+      ParseStatus pstatus = page.getParseStatus();
+      if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)
+          || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {
+        return; // filter urls not parsed
+      }
+
+      Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
+      if (!batchId.equals(REINDEX)) {
+        if (!NutchJob.shouldProcess(mark, batchId)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
+          }
+          return;
         }
-        return;
       }
-    }
 
-    context.write(key, page);
+      context.write(key, page);
+    }    
   }
 
+
   private static Collection<WebPage.Field> getFields(Job job) {
     Configuration conf = job.getConfiguration();
     Collection<WebPage.Field> columns = new HashSet<WebPage.Field>(FIELDS);
@@ -103,7 +94,8 @@ implements Tool {
         StringComparator.class, RawComparator.class);
 
     Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, String.class, WebPage.class, this.getClass());
+    StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
+        IndexerMapper.class);
     job.setReducerClass(IndexerReducer.class);
     job.setOutputFormatClass(IndexerOutputFormat.class);
     return job;

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexerJob.java Thu Nov 25 12:05:57 2010
@@ -16,6 +16,9 @@
  */
 package org.apache.nutch.indexer.solr;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
@@ -32,53 +35,46 @@ import org.apache.nutch.indexer.NutchInd
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchTool;
+import org.apache.nutch.util.ToolUtil;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
 
-public class SolrIndexerJob extends IndexerJob implements NutchTool {
+public class SolrIndexerJob extends IndexerJob {
 
   public static Logger LOG = LoggerFactory.getLogger(SolrIndexerJob.class);
 
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
-  }
- 
-  public Map<String,Object> finish() throws Exception {
-    return null;
-  }
-  
-  public Job[] createJobs(Object... args) throws Exception {
-    String solrUrl = (String)args[0];
-    String batchId = (String)args[1];
+  @Override
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
+    String solrUrl = (String)args.get(Nutch.ARG_SOLR);
+    String batchId = (String)args.get(Nutch.ARG_BATCH);
     NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
     getConf().set(SolrConstants.SERVER_URL, solrUrl);
 
-    Job job = createIndexJob(getConf(), "solr-index", batchId);
+    currentJob = createIndexJob(getConf(), "solr-index", batchId);
     Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
                 + new Random().nextInt());
 
-    FileOutputFormat.setOutputPath(job, tmp);
-    return new Job[]{job};
+    FileOutputFormat.setOutputPath(currentJob, tmp);
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    return results;
   }
 
-  private void indexSolr(String solrUrl, String crawlId) throws Exception {
+  private void indexSolr(String solrUrl, String batchId) throws Exception {
     LOG.info("SolrIndexerJob: starting");
 
-    Job[] jobs = createJobs(solrUrl, crawlId);
-    boolean success = false;
     try {
-      success = jobs[0].waitForCompletion(true);
+      run(ToolUtil.toArgMap(
+          Nutch.ARG_SOLR, solrUrl,
+          Nutch.ARG_BATCH, batchId));
       // do the commits once and for all the reducers in one go
       SolrServer solr = new CommonsHttpSolrServer(solrUrl);
       solr.commit();
     } finally {
-      FileSystem.get(getConf()).delete(FileOutputFormat.getOutputPath(jobs[0]), true);
+      FileSystem.get(getConf()).delete(
+          FileOutputFormat.getOutputPath(currentJob), true);
     }
-    LOG.info("SolrIndexerJob: " + (success ? "done" : "failed"));
+    LOG.info("SolrIndexerJob: done.");
   }
 
   public int run(String[] args) throws Exception {

Modified: nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java Thu Nov 25 12:05:57 2010
@@ -75,4 +75,53 @@ public interface Nutch {
   public static final Utf8 ALL_CRAWL_ID = new Utf8(ALL_BATCH_ID_STR);
 
   public static final String CRAWL_ID_KEY = "storage.crawl.id";
+  
+  
+  // short constants for cmd-line args
+  /** Batch id to select. */
+  public static final String ARG_BATCH = "batch";
+  /** Crawl id to use. */
+  public static final String ARG_CRAWL = "crawl";
+  /** Resume previously aborted op. */
+  public static final String ARG_RESUME = "resume";
+  /** Force processing even if there are locks or inconsistencies. */
+  public static final String ARG_FORCE = "force";
+  /** Parse during fetching. */
+  public static final String ARG_PARSE = "parse";
+  /** Sort statistics. */
+  public static final String ARG_SORT = "sort";
+  /** Solr URL. */
+  public static final String ARG_SOLR = "solr";
+  /** Number of fetcher threads (per map task). */
+  public static final String ARG_THREADS = "threads";
+  /** Number of fetcher tasks. */
+  public static final String ARG_NUMTASKS = "numTasks";
+  /** Generate topN scoring URLs. */
+  public static final String ARG_TOPN = "topN";
+  /** The notion of current time. */
+  public static final String ARG_CURTIME = "curTime";
+  /** Apply URLFilters. */
+  public static final String ARG_FILTER = "filter";
+  /** Apply URLNormalizers. */
+  public static final String ARG_NORMALIZE = "normalize";
+  /** Whitespace-separated list of seed URLs. */
+  public static final String ARG_SEEDLIST = "seed";
+  /** a path to a directory containing a list of seed URLs. */
+  public static final String ARG_SEEDDIR = "seedDir";
+  /** Class to run as a NutchTool. */
+  public static final String ARG_CLASS = "class";
+  /** Depth (number of cycles) of a crawl. */
+  public static final String ARG_DEPTH = "depth";
+  
+  // short constants for status / results fields
+  /** Status / result message. */
+  public static final String STAT_MESSAGE = "msg";
+  /** Phase of processing. */
+  public static final String STAT_PHASE = "phase";
+  /** Progress (float). */
+  public static final String STAT_PROGRESS = "progress";
+  /** Jobs. */
+  public static final String STAT_JOBS = "jobs";
+  /** Counters. */
+  public static final String STAT_COUNTERS = "counters";
 }

Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusCodes.java Thu Nov 25 12:05:57 2010
@@ -20,6 +20,8 @@ public interface ParseStatusCodes {
 
   // Secondary success codes go here:
 
+  public static final short SUCCESS_OK                = 0;
+
   /** Parsed content contains a directive to redirect to another URL.
    * The target URL can be retrieved from the arguments.
    */

Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParseStatusUtils.java Thu Nov 25 12:05:57 2010
@@ -16,6 +16,7 @@ public class ParseStatusUtils {
 
   static {
     STATUS_SUCCESS.setMajorCode(ParseStatusCodes.SUCCESS);
+    minorCodes.put(ParseStatusCodes.SUCCESS_OK, "ok");
     minorCodes.put(ParseStatusCodes.SUCCESS_REDIRECT, "redirect");
     minorCodes.put(ParseStatusCodes.FAILED_EXCEPTION, "exception");
     minorCodes.put(ParseStatusCodes.FAILED_INVALID_FORMAT, "invalid_format");
@@ -81,7 +82,7 @@ public class ParseStatusUtils {
     }
     StringBuilder sb = new StringBuilder();
     sb.append(ParseStatusCodes.majorCodes[status.getMajorCode()] +
-        "/" + minorCodes.get(status.getMinorCode()));
+        "/" + minorCodes.get((short)status.getMinorCode()));
     sb.append(" (" + status.getMajorCode() + "/" + status.getMinorCode() + ")");
     sb.append(", args=[");
     GenericArray<Utf8> args = status.getArgs();

Modified: nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/parse/ParserJob.java Thu Nov 25 12:05:57 2010
@@ -25,10 +25,10 @@ import org.apache.nutch.util.NutchConfig
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.ToolUtil;
 import org.apache.gora.mapreduce.GoraMapper;
 
-public class ParserJob extends GoraMapper<String, WebPage, String, WebPage>
-    implements Tool, NutchTool {
+public class ParserJob extends NutchTool implements Tool {
 
   public static final Logger LOG = LoggerFactory.getLogger(ParserJob.class);
 
@@ -50,66 +50,70 @@ public class ParserJob extends GoraMappe
     FIELDS.add(WebPage.Field.METADATA);
   }
 
-  private ParseUtil parseUtil;
 
-  private boolean shouldResume;
+  public static class ParserMapper 
+      extends GoraMapper<String, WebPage, String, WebPage> {
+    private ParseUtil parseUtil;
 
-  private boolean force;
+    private boolean shouldResume;
 
-  private Utf8 batchId;
+    private boolean force;
 
-  public ParserJob() {
-
-  }
-
-  public ParserJob(Configuration conf) {
-    setConf(conf);
-  }
-
-  @Override
-  public void setup(Context context) throws IOException {
-    Configuration conf = context.getConfiguration();
-    parseUtil = new ParseUtil(conf);
-    shouldResume = conf.getBoolean(RESUME_KEY, false);
-    force = conf.getBoolean(FORCE_KEY, false);
-    batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
-  }
-
-  @Override
-  public void map(String key, WebPage page, Context context)
-      throws IOException, InterruptedException {
-    Utf8 mark = Mark.FETCH_MARK.checkMark(page);
-    if (!NutchJob.shouldProcess(mark, batchId)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
-      }
-      return;
+    private Utf8 batchId;
+    
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      parseUtil = new ParseUtil(conf);
+      shouldResume = conf.getBoolean(RESUME_KEY, false);
+      force = conf.getBoolean(FORCE_KEY, false);
+      batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
     }
-    if (shouldResume && Mark.PARSE_MARK.checkMark(page) != null) {
-      if (force) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; already parsed");
-        }
-      } else {
+
+    @Override
+    public void map(String key, WebPage page, Context context)
+        throws IOException, InterruptedException {
+      Utf8 mark = Mark.FETCH_MARK.checkMark(page);
+      if (!NutchJob.shouldProcess(mark, batchId)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id");
         }
         return;
       }
-    }
+      if (shouldResume && Mark.PARSE_MARK.checkMark(page) != null) {
+        if (force) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Forced parsing " + TableUtil.unreverseUrl(key) + "; already parsed");
+          }
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already parsed");
+          }
+          return;
+        }
+      }
 
-    URLWebPage redirectedPage = parseUtil.process(key, page);
-    ParseStatus pstatus = page.getParseStatus();
-    if (pstatus != null) {
-      context.getCounter("ParserStatus",
-          ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1);
-    }
+      URLWebPage redirectedPage = parseUtil.process(key, page);
+      ParseStatus pstatus = page.getParseStatus();
+      if (pstatus != null) {
+        context.getCounter("ParserStatus",
+            ParseStatusCodes.majorCodes[pstatus.getMajorCode()]).increment(1);
+      }
+
+      if (redirectedPage != null) {
+        context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
+                      redirectedPage.getDatum());
+      }
+      context.write(key, page);
+    }    
+  }
+  
+  public ParserJob() {
 
-    if (redirectedPage != null) {
-      context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
-                    redirectedPage.getDatum());
-    }
-    context.write(key, page);
+  }
+
+  public ParserJob(Configuration conf) {
+    setConf(conf);
   }
 
   public Collection<WebPage.Field> getFields(Job job) {
@@ -146,42 +150,35 @@ public class ParserJob extends GoraMappe
     this.conf = conf;
   }
 
-  public Map<String,Object> prepare() throws Exception {
-    return null;
-  }
-  
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception {
-    return null;
-  }
- 
-  public Map<String,Object> finish() throws Exception {
-    return null;
-  }
-  
-  public Job[] createJobs(Object... args) throws Exception {
-    String batchId = (String)args[0];
-    boolean shouldResume = (Boolean)args[1];
-    boolean force = (Boolean)args[2];
+  @Override
+  public Map<String,Object> run(Map<String,Object> args) throws Exception {
+    String batchId = (String)args.get(Nutch.ARG_BATCH);
+    Boolean shouldResume = (Boolean)args.get(Nutch.ARG_RESUME);
+    Boolean force = (Boolean)args.get(Nutch.ARG_FORCE);
     
     if (batchId != null) {
       getConf().set(GeneratorJob.BATCH_ID, batchId);
     }
-    getConf().setBoolean(RESUME_KEY, shouldResume);
-    getConf().setBoolean(FORCE_KEY, force);
-    final Job job = new NutchJob(getConf(), "parse");
+    if (shouldResume != null) {
+      getConf().setBoolean(RESUME_KEY, shouldResume);
+    }
+    if (force != null) {
+      getConf().setBoolean(FORCE_KEY, force);
+    }
+    currentJob = new NutchJob(getConf(), "parse");
     
-    Collection<WebPage.Field> fields = getFields(job);
-    StorageUtils.initMapperJob(job, fields, String.class, WebPage.class,
-        ParserJob.class);
-    StorageUtils.initReducerJob(job, IdentityPageReducer.class);
-    job.setNumReduceTasks(0);
-
-    return new Job[]{job};
+    Collection<WebPage.Field> fields = getFields(currentJob);
+    StorageUtils.initMapperJob(currentJob, fields, String.class, WebPage.class,
+        ParserMapper.class);
+    StorageUtils.initReducerJob(currentJob, IdentityPageReducer.class);
+    currentJob.setNumReduceTasks(0);
+
+    currentJob.waitForCompletion(true);
+    ToolUtil.recordJobStatus(null, currentJob, results);
+    return results;
   }
 
-  public int parse(String crawlId, boolean shouldResume, boolean force) throws Exception {
-    Job[] jobs = createJobs(crawlId, shouldResume, force);
-
+  public int parse(String batchId, boolean shouldResume, boolean force) throws Exception {
     LOG.info("ParserJob: starting");
 
     LOG.info("ParserJob: resuming:\t" + getConf().getBoolean(RESUME_KEY, false));
@@ -191,11 +188,10 @@ public class ParserJob extends GoraMappe
     } else {
       LOG.info("ParserJob: batchId:\t" + batchId);
     }
-    boolean success = jobs[0].waitForCompletion(true);
-    if (!success){
-      LOG.info("ParserJob: failed");
-      return -1;
-    }
+    run(ToolUtil.toArgMap(
+        Nutch.ARG_BATCH, batchId,
+        Nutch.ARG_RESUME, shouldResume,
+        Nutch.ARG_FORCE, force));
     LOG.info("ParserJob: success");
     return 0;
   }