You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2010/11/25 13:05:58 UTC
svn commit: r1039014 [2/2] - in /nutch/trunk: ./
src/java/org/apache/nutch/api/ src/java/org/apache/nutch/api/impl/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer...
Modified: nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/util/NutchTool.java Thu Nov 25 12:05:57 2010
@@ -1,20 +1,80 @@
package org.apache.nutch.util;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.metadata.Nutch;
-public interface NutchTool {
+public abstract class NutchTool extends Configured {
- /** Prepares the tool. May return additional info, or null. */
- public Map<String,Object> prepare() throws Exception;
-
- /** Create jobs to be executed in sequence. */
- public Job[] createJobs(Object... args) throws Exception;
+ protected HashMap<String,Object> results = new HashMap<String,Object>();
+ protected Map<String,Object> status =
+ Collections.synchronizedMap(new HashMap<String,Object>());
+ protected Job currentJob;
+ protected int numJobs;
+ protected int currentJobNum;
+
+ /** Runs the tool, using a map of arguments.
+ * May return results, or null.
+ */
+ public abstract Map<String,Object> run(Map<String,Object> args) throws Exception;
+
+ /** Returns relative progress of the tool, a float in range [0,1]. */
+ public float getProgress() {
+ float res = 0;
+ if (currentJob != null) {
+ try {
+ res = (currentJob.mapProgress() + currentJob.reduceProgress()) / 2.0f;
+ } catch (IOException e) {
+ e.printStackTrace();
+ res = 0;
+ } catch (IllegalStateException ile) {
+ ile.printStackTrace();
+ res = 0;
+ }
+ }
+ // take into account multiple jobs
+ if (numJobs > 1) {
+ res = (currentJobNum + res) / (float)numJobs;
+ }
+ status.put(Nutch.STAT_PROGRESS, res);
+ return res;
+ }
+
+
+ /** Returns current status of the running tool. */
+ public Map<String,Object> getStatus() {
+ return status;
+ }
- /** Post-process results of a job. */
- public Map<String,Object> postJob(int jobIndex, Job job) throws Exception;
+ /** Stop the job with the possibility to resume. Subclasses should
+ * override this, since by default it calls {@link #killJob()}.
+ * @return true if succeeded, false otherwise
+ */
+ public boolean stopJob() throws Exception {
+ return killJob();
+ }
- /** Finish processing and optionally return results. */
- public Map<String,Object> finish() throws Exception;
+ /**
+ * Kill the job immediately. Clients should assume that any results
+ * that the job produced so far are in inconsistent state or missing.
+ * @return true if succeeded, false otherwise.
+ * @throws Exception
+ */
+ public boolean killJob() throws Exception {
+ if (currentJob != null && !currentJob.isComplete()) {
+ try {
+ currentJob.killJob();
+ return true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ return false;
+ }
}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java?rev=1039014&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java Thu Nov 25 12:05:57 2010
@@ -0,0 +1,62 @@
+package org.apache.nutch.util;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.nutch.metadata.Nutch;
+
+public class ToolUtil {
+
+ public static final Map<String,Object> toArgMap(Object... args) {
+ if (args == null) {
+ return null;
+ }
+ if (args.length % 2 != 0) {
+ throw new RuntimeException("expected pairs of argName argValue");
+ }
+ HashMap<String,Object> res = new HashMap<String,Object>();
+ for (int i = 0; i < args.length; i += 2) {
+ if (args[i + 1] != null) {
+ res.put(String.valueOf(args[i]), args[i + 1]);
+ }
+ }
+ return res;
+ }
+
+ public static final void recordJobStatus(String label, Job job, Map<String,Object> results) {
+ Map<String,Object> jobs = (Map<String,Object>)results.get(Nutch.STAT_JOBS);
+ if (jobs == null) {
+ jobs = new LinkedHashMap<String,Object>();
+ results.put(Nutch.STAT_JOBS, jobs);
+ }
+ Map<String,Object> stats = new HashMap<String,Object>();
+ Map<String,Object> countStats = new HashMap<String,Object>();
+ try {
+ Counters counters = job.getCounters();
+ for (CounterGroup cg : counters) {
+ Map<String,Object> cnts = new HashMap<String,Object>();
+ countStats.put(cg.getDisplayName(), cnts);
+ for (Counter c : cg) {
+ cnts.put(c.getName(), c.getValue());
+ }
+ }
+ } catch (Exception e) {
+ countStats.put("error", e.toString());
+ }
+ stats.put(Nutch.STAT_COUNTERS, countStats);
+ stats.put("jobName", job.getJobName());
+ stats.put("jobID", job.getJobID());
+ if (label == null) {
+ label = job.getJobName();
+ if (job.getJobID() != null) {
+ label = label + "-" + job.getJobID();
+ }
+ }
+ jobs.put(label, stats);
+ }
+}
Propchange: nutch/trunk/src/java/org/apache/nutch/util/ToolUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java?rev=1039014&r1=1039013&r2=1039014&view=diff
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java (original)
+++ nutch/trunk/src/test/org/apache/nutch/api/TestAPI.java Thu Nov 25 12:05:57 2010
@@ -1,28 +1,35 @@
package org.apache.nutch.api;
+import static org.junit.Assert.*;
+
import java.util.HashMap;
import java.util.Map;
import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchTool;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
import org.restlet.ext.jackson.JacksonRepresentation;
import org.restlet.representation.Representation;
import org.restlet.resource.ClientResource;
-import junit.framework.TestCase;
-
-public class TestAPI extends TestCase {
+public class TestAPI {
- NutchServer server;
+ private static NutchServer server;
ClientResource cli;
- String baseUrl = "http://localhost:8192/nutch/";
+ private static String baseUrl = "http://localhost:8192/nutch/";
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void before() throws Exception {
server = new NutchServer(8192);
server.start();
}
- public void tearDown() throws Exception {
+ @AfterClass
+ public static void after() throws Exception {
if (!server.stop(false)) {
for (int i = 1; i < 11; i++) {
System.err.println("Waiting for jobs to complete - " + i + "s");
@@ -41,13 +48,16 @@ public class TestAPI extends TestCase {
}
}
+ @Test
public void testInfoAPI() throws Exception {
ClientResource cli = new ClientResource(baseUrl);
- String expected = "[[\"confs\",\"Configuration manager\"],[\"jobs\",\"Job manager\"]]";
+ String expected = "[[\"admin\",\"Service admin actions\"],[\"confs\",\"Configuration manager\"],[\"db\",\"DB data streaming\"],[\"jobs\",\"Job manager\"]]";
String got = cli.get().getText();
assertEquals(expected, got);
}
+ @SuppressWarnings("rawtypes")
+ @Test
public void testConfsAPI() throws Exception {
ClientResource cli = new ClientResource(baseUrl + ConfResource.PATH);
assertEquals("[\"default\"]", cli.get().getText());
@@ -70,6 +80,8 @@ public class TestAPI extends TestCase {
assertEquals("[\"default\"]", cli.get().getText());
}
+ @SuppressWarnings("rawtypes")
+ @Test
public void testJobsAPI() throws Exception {
ClientResource cli = new ClientResource(baseUrl + JobResource.PATH);
assertEquals("[]", cli.get().getText());
@@ -77,8 +89,6 @@ public class TestAPI extends TestCase {
Map<String,Object> map = new HashMap<String,Object>();
map.put(Params.JOB_TYPE, JobType.READDB.toString());
map.put(Params.CONF_ID, "default");
- JacksonRepresentation<Map<String,Object>> jr =
- new JacksonRepresentation<Map<String,Object>>(map);
Representation r = cli.put(map);
String jobId = r.getText();
assertNotNull(jobId);
@@ -90,5 +100,101 @@ public class TestAPI extends TestCase {
String state = (String)list[0].get("state");
assertEquals(jobId, id);
assertEquals(state, "RUNNING");
+ int cnt = 10;
+ do {
+ try {
+ Thread.sleep(2000);
+ } catch (Exception e) {};
+ list = cli.get(Map[].class);
+ state = (String)list[0].get("state");
+ if (!state.equals("RUNNING")) {
+ break;
+ }
+ } while (--cnt > 0);
+ assertTrue(cnt > 0);
+ if (list == null) return;
+ for (Map m : list) {
+ System.out.println(m);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testStopKill() throws Exception {
+ ClientResource cli = new ClientResource(baseUrl + JobResource.PATH);
+ // create
+ Map<String,Object> map = new HashMap<String,Object>();
+ map.put(Params.JOB_TYPE, JobType.CLASS.toString());
+ Map<String,Object> args = new HashMap<String,Object>();
+ map.put(Params.ARGS, args);
+ args.put(Nutch.ARG_CLASS, SpinningJob.class.getName());
+ map.put(Params.CONF_ID, "default");
+ Representation r = cli.put(map);
+ String jobId = r.getText();
+ cli.release();
+ assertNotNull(jobId);
+ System.out.println(jobId);
+ assertTrue(jobId.startsWith("default-CLASS-"));
+ ClientResource stopCli = new ClientResource(baseUrl + JobResource.PATH +
+ "?job=" + jobId + "&cmd=stop");
+ r = stopCli.get();
+ assertEquals("true", r.getText());
+ stopCli.release();
+ Thread.sleep(2000); // wait for the job to finish
+ ClientResource jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId);
+ Map<String,Object> res = jobCli.get(Map.class);
+ res = (Map<String,Object>)res.get("result");
+ assertEquals("stopped", res.get("res"));
+ jobCli.release();
+ // restart and kill
+ r = cli.put(map);
+ jobId = r.getText();
+ cli.release();
+ assertNotNull(jobId);
+ System.out.println(jobId);
+ assertTrue(jobId.startsWith("default-CLASS-"));
+ ClientResource killCli = new ClientResource(baseUrl + JobResource.PATH +
+ "?job=" + jobId + "&cmd=abort");
+ r = killCli.get();
+ assertEquals("true", r.getText());
+ killCli.release();
+ Thread.sleep(2000); // wait for the job to finish
+ jobCli = new ClientResource(baseUrl + JobResource.PATH + "/" + jobId);
+ res = jobCli.get(Map.class);
+ res = (Map<String,Object>)res.get("result");
+ assertEquals("killed", res.get("res"));
+ jobCli.release();
+ }
+
+ public static class SpinningJob extends NutchTool {
+ volatile boolean shouldStop = false;
+
+ @Override
+ public Map<String, Object> run(Map<String, Object> args) throws Exception {
+ status.put(Nutch.STAT_MESSAGE, "running");
+ int cnt = 60;
+ while (!shouldStop && cnt-- > 0) {
+ Thread.sleep(1000);
+ }
+ if (cnt == 0) {
+ results.put("res", "failed");
+ }
+ return results;
+ }
+
+ @Override
+ public boolean stopJob() throws Exception {
+ results.put("res", "stopped");
+ shouldStop = true;
+ return true;
+ }
+
+ @Override
+ public boolean killJob() throws Exception {
+ results.put("res", "killed");
+ shouldStop = true;
+ return true;
+ }
+
}
}