You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2010/11/25 13:05:58 UTC

svn commit: r1039014 [2/2] - in /nutch/trunk: ./ src/java/org/apache/nutch/api/ src/java/org/apache/nutch/api/impl/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer...

Modified: nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java Thu Nov 25 12:05:57 2010
@@ -1,20 +1,80 @@
 package org.apache.nutch.util;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.metadata.Nutch;
 
-public interface NutchTool {
+public abstract class NutchTool extends Configured {
   
-  /** Prepares the tool. May return additional info, or null. */
-  public Map<String,Object> prepare() throws Exception;
-
-  /** Create jobs to be executed in sequence. */
-  public Job[] createJobs(Object... args) throws Exception;
+  protected HashMap<String,Object> results = new HashMap<String,Object>();
+  protected Map<String,Object> status =
+    Collections.synchronizedMap(new HashMap<String,Object>());
+  protected Job currentJob;
+  protected int numJobs;
+  protected int currentJobNum;
+  
+  /** Runs the tool, using a map of arguments.
+   * May return results, or null.
+   */
+  public abstract Map<String,Object> run(Map<String,Object> args) throws Exception;
+  
+  /** Returns relative progress of the tool, a float in range [0,1]. */
+  public float getProgress() {
+    float res = 0;
+    if (currentJob != null) {
+      try {
+        res = (currentJob.mapProgress() + currentJob.reduceProgress()) / 2.0f;
+      } catch (IOException e) {
+        e.printStackTrace();
+        res = 0;
+      } catch (IllegalStateException ile) {
+        ile.printStackTrace();
+        res = 0;
+      }
+    }
+    // take into account multiple jobs
+    if (numJobs > 1) {
+      res = (currentJobNum + res) / (float)numJobs;
+    }
+    status.put(Nutch.STAT_PROGRESS, res);
+    return res;
+  }
+  
+  
+  /** Returns current status of the running tool. */
+  public Map<String,Object> getStatus() {
+    return status;
+  }
   
-  /** Post-process results of a job. */
-  public Map<String,Object> postJob(int jobIndex, Job job) throws Exception;
+  /** Stop the job with the possibility to resume. Subclasses should
+   * override this, since by default it calls {@link #killJob()}.
+   * @return true if succeeded, false otherwise
+   */
+  public boolean stopJob() throws Exception {
+    return killJob();
+  }
   
-  /** Finish processing and optionally return results. */
-  public Map<String,Object> finish() throws Exception;
+  /**
+   * Kill the job immediately. Clients should assume that any results
+   * that the job produced so far are in inconsistent state or missing.
+   * @return true if succeeded, false otherwise.
+   * @throws Exception
+   */
+  public boolean killJob() throws Exception {
+    if (currentJob != null && !currentJob.isComplete()) {
+      try {
+        currentJob.killJob();
+        return true;
+      } catch (Exception e) {
+        e.printStackTrace();
+        return false;
+      }
+    }
+    return false;
+  }
 }
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,62 @@
+package org.apache.nutch.util;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.metadata.Nutch;
+
+public class ToolUtil {
+
+  public static final Map<String,Object> toArgMap(Object... args) {
+    if (args == null) {
+      return null;
+    }
+    if (args.length % 2 != 0) {
+      throw new RuntimeException("expected pairs of argName argValue");
+    }
+    HashMap<String,Object> res = new HashMap<String,Object>();
+    for (int i = 0; i < args.length; i += 2) {
+      if (args[i + 1] != null) {
+        res.put(String.valueOf(args[i]), args[i + 1]);
+      }
+    }
+    return res;
+  }
+  
+  public static final void recordJobStatus(String label, Job job, Map<String,Object> results) {
+    Map<String,Object> jobs = (Map<String,Object>)results.get(Nutch.STAT_JOBS);
+    if (jobs == null) {
+      jobs = new LinkedHashMap<String,Object>();
+      results.put(Nutch.STAT_JOBS, jobs);
+    }
+    Map<String,Object> stats = new HashMap<String,Object>();
+    Map<String,Object> countStats = new HashMap<String,Object>();
+    try {
+      Counters counters = job.getCounters();
+      for (CounterGroup cg : counters) {
+        Map<String,Object> cnts = new HashMap<String,Object>();
+        countStats.put(cg.getDisplayName(), cnts);
+        for (Counter c : cg) {
+          cnts.put(c.getName(), c.getValue());
+        }
+      }
+    } catch (Exception e) {
+      countStats.put("error", e.toString());
+    }
+    stats.put(Nutch.STAT_COUNTERS, countStats);
+    stats.put("jobName", job.getJobName());
+    stats.put("jobID", job.getJobID());
+    if (label == null) {
+      label = job.getJobName();
+      if (job.getJobID() != null) {
+        label = label + "-" + job.getJobID();
+      }
+    }
+    jobs.put(label, stats);
+  }
+}

Propchange: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java Thu Nov 25 12:05:57 2010
@@ -1,28 +1,35 @@
 package org.apache.nutch.api;
 
+import static org.junit.Assert.*;
+
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchTool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 import org.restlet.ext.jackson.JacksonRepresentation;
 import org.restlet.representation.Representation;
 import org.restlet.resource.ClientResource;
 
-import junit.framework.TestCase;
-
-public class TestAPI extends TestCase {
+public class TestAPI {
   
-  NutchServer server;
+  private static NutchServer server;
   ClientResource cli;
   
-  String baseUrl = "http://localhost:8192/nutch/";
+  private static String baseUrl = "http://localhost:8192/nutch/";
   
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void before() throws Exception {
     server = new NutchServer(8192);
     server.start();
   }
   
-  public void tearDown() throws Exception {
+  @AfterClass
+  public static void after() throws Exception {
     if (!server.stop(false)) {
       for (int i = 1; i < 11; i++) {
         System.err.println("Waiting for jobs to complete - " + i + "s");
@@ -41,13 +48,16 @@ public class TestAPI extends TestCase {
     }
   }
   
+  @Test
   public void testInfoAPI() throws Exception {
     ClientResource cli = new ClientResource(baseUrl);
-    String expected = "[[\"confs\",\"Configuration manager\"],[\"jobs\",\"Job manager\"]]";
+    String expected = "[[\"admin\",\"Service admin actions\"],[\"confs\",\"Configuration manager\"],[\"db\",\"DB data streaming\"],[\"jobs\",\"Job manager\"]]";
     String got = cli.get().getText();
     assertEquals(expected, got);
   }
   
+  @SuppressWarnings("rawtypes")
+  @Test
   public void testConfsAPI() throws Exception {
     ClientResource cli = new ClientResource(baseUrl + ConfResource.PATH);
     assertEquals("[\"default\"]", cli.get().getText());
@@ -70,6 +80,8 @@ public class TestAPI extends TestCase {
     assertEquals("[\"default\"]", cli.get().getText());
   }
   
+  @SuppressWarnings("rawtypes")
+  @Test
   public void testJobsAPI() throws Exception {
     ClientResource cli = new ClientResource(baseUrl + JobResource.PATH);
     assertEquals("[]", cli.get().getText());
@@ -77,8 +89,6 @@ public class TestAPI extends TestCase {
     Map<String,Object> map = new HashMap<String,Object>();
     map.put(Params.JOB_TYPE, JobType.READDB.toString());
     map.put(Params.CONF_ID, "default");
-    JacksonRepresentation<Map<String,Object>> jr =
-      new JacksonRepresentation<Map<String,Object>>(map);
     Representation r = cli.put(map);
     String jobId = r.getText();
     assertNotNull(jobId);
@@ -90,5 +100,101 @@ public class TestAPI extends TestCase {
     String state = (String)list[0].get("state");
     assertEquals(jobId, id);
     assertEquals(state, "RUNNING");
+    int cnt = 10;
+    do {
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {};
+      list = cli.get(Map[].class);
+      state = (String)list[0].get("state");
+      if (!state.equals("RUNNING")) {
+        break;
+      }
+    } while (--cnt > 0);
+    assertTrue(cnt > 0);
+    if (list == null) return;
+    for (Map m : list) {
+      System.out.println(m);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testStopKill() throws Exception {
+    ClientResource cli = new ClientResource(baseUrl + JobResource.PATH);
+    // create
+    Map<String,Object> map = new HashMap<String,Object>();
+    map.put(Params.JOB_TYPE, JobType.CLASS.toString());
+    Map<String,Object> args = new HashMap<String,Object>();
+    map.put(Params.ARGS, args);
+    args.put(Nutch.ARG_CLASS, SpinningJob.class.getName());
+    map.put(Params.CONF_ID, "default");
+    Representation r = cli.put(map);
+    String jobId = r.getText();
+    cli.release();
+    assertNotNull(jobId);
+    System.out.println(jobId);
+    assertTrue(jobId.startsWith("default-CLASS-"));
+    ClientResource stopCli = new ClientResource(baseUrl + JobResource.PATH +
+        "?job=" + jobId + "&cmd=stop");
+    r = stopCli.get();
+    assertEquals("true", r.getText());
+    stopCli.release();
+    Thread.sleep(2000); // wait for the job to finish
+    ClientResource jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId);
+    Map<String,Object> res = jobCli.get(Map.class);
+    res = (Map<String,Object>)res.get("result");
+    assertEquals("stopped", res.get("res"));
+    jobCli.release();
+    // restart and kill
+    r = cli.put(map);
+    jobId = r.getText();
+    cli.release();
+    assertNotNull(jobId);
+    System.out.println(jobId);
+    assertTrue(jobId.startsWith("default-CLASS-"));
+    ClientResource killCli = new ClientResource(baseUrl + JobResource.PATH +
+        "?job=" + jobId + "&cmd=abort");
+    r = killCli.get();
+    assertEquals("true", r.getText());
+    killCli.release();
+    Thread.sleep(2000); // wait for the job to finish
+    jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId);
+    res = jobCli.get(Map.class);
+    res = (Map<String,Object>)res.get("result");
+    assertEquals("killed", res.get("res"));
+    jobCli.release();
+  }
+  
+  public static class SpinningJob extends NutchTool {
+    volatile boolean shouldStop = false;
+
+    @Override
+    public Map<String, Object> run(Map<String, Object> args) throws Exception {
+      status.put(Nutch.STAT_MESSAGE, "running");
+      int cnt = 60;
+      while (!shouldStop && cnt-- > 0) {
+        Thread.sleep(1000);
+      }
+      if (cnt == 0) {
+        results.put("res", "failed");
+      }
+      return results;
+    }
+
+    @Override
+    public boolean stopJob() throws Exception {
+      results.put("res", "stopped");
+      shouldStop = true;
+      return true;
+    }
+
+    @Override
+    public boolean killJob() throws Exception {
+      results.put("res", "killed");
+      shouldStop = true;
+      return true;
+    }
+    
   }
 }