You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2014/06/26 02:44:44 UTC
svn commit: r1605644 [1/2] - in /nutch/branches/2.x: ./ ivy/
src/java/org/apache/nutch/api/ src/java/org/apache/nutch/api/impl/
src/java/org/apache/nutch/api/impl/db/ src/java/org/apache/nutch/api/misc/
src/java/org/apache/nutch/api/model/ src/java/org...
Author: lewismc
Date: Thu Jun 26 00:44:43 2014
New Revision: 1605644
URL: http://svn.apache.org/r1605644
Log:
NUTCH-1769 REST API refactoring
Added:
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobFactory.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobWorker.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/NutchServerPoolExecutor.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbIterator.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbPageConverter.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbReader.java
nutch/branches/2.x/src/java/org/apache/nutch/api/misc/
nutch/branches/2.x/src/java/org/apache/nutch/api/misc/ErrorStatusService.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/
nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/
nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/DbFilter.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/JobConfig.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/NutchConfig.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/
nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/DbQueryResult.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/ErrorResponse.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/JobInfo.java
nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/NutchStatus.java
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AbstractResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AdminResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/ConfigResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/DbResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/resources/JobResource.java
Removed:
nutch/branches/2.x/src/java/org/apache/nutch/api/APIInfoResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/AdminResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/ConfResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/DbReader.java
nutch/branches/2.x/src/java/org/apache/nutch/api/DbResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/JobResource.java
nutch/branches/2.x/src/java/org/apache/nutch/api/JobStatus.java
nutch/branches/2.x/src/java/org/apache/nutch/api/NutchApp.java
nutch/branches/2.x/src/java/org/apache/nutch/api/Params.java
Modified:
nutch/branches/2.x/CHANGES.txt
nutch/branches/2.x/ivy/ivy.xml
nutch/branches/2.x/src/java/org/apache/nutch/api/ConfManager.java
nutch/branches/2.x/src/java/org/apache/nutch/api/JobManager.java
nutch/branches/2.x/src/java/org/apache/nutch/api/NutchServer.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMConfManager.java
nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMJobManager.java
Modified: nutch/branches/2.x/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Thu Jun 26 00:44:43 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Current Development
+* NUTCH-1769 REST API refactoring (Fjodor Vershinin via lewismc)
+
* NUTCH-1633 slf4j is provided by hadoop and should not be included in the job file (kaveh minooie via jnioche)
* NUTCH-1787 update and complete API doc overview page (snagel)
Modified: nutch/branches/2.x/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivy.xml (original)
+++ nutch/branches/2.x/ivy/ivy.xml Thu Jun 26 00:44:43 2014
@@ -89,8 +89,10 @@
<dependency org="org.hsqldb" name="hsqldb" rev="2.2.8" conf="*->default" />
<dependency org="org.jdom" name="jdom" rev="1.1" conf="test->default"/>
- <dependency org="org.restlet.jse" name="org.restlet" rev="2.0.5" conf="*->default" />
- <dependency org="org.restlet.jse" name="org.restlet.ext.jackson" rev="2.0.5"
+ <dependency org="org.restlet.jse" name="org.restlet" rev="2.2.1" conf="*->default" />
+ <dependency org="org.restlet.jse" name="org.restlet.ext.jackson" rev="2.2.1"
+ conf="*->default" />
+ <dependency org="org.restlet.jse" name="org.restlet.ext.jaxrs" rev="2.2.1"
conf="*->default" />
<!--================-->
Modified: nutch/branches/2.x/src/java/org/apache/nutch/api/ConfManager.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/ConfManager.java?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/ConfManager.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/ConfManager.java Thu Jun 26 00:44:43 2014
@@ -20,18 +20,19 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.api.model.request.NutchConfig;
public interface ConfManager {
- public Set<String> list() throws Exception;
-
+ public Set<String> list();
+
public Configuration get(String confId);
-
- public Map<String,String> getAsMap(String confId);
-
+
+ public Map<String, String> getAsMap(String confId);
+
public void delete(String confId);
-
- public void create(String confId, Map<String,String> props, boolean force) throws Exception;
-
- public void setProperty(String confId, String propName, String propValue) throws Exception;
+
+ public void setProperty(String confId, String propName, String propValue);
+
+ public String create(NutchConfig nutchConfig);
}
Modified: nutch/branches/2.x/src/java/org/apache/nutch/api/JobManager.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/JobManager.java?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/JobManager.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/JobManager.java Thu Jun 26 00:44:43 2014
@@ -16,22 +16,25 @@
******************************************************************************/
package org.apache.nutch.api;
-import java.util.List;
-import java.util.Map;
-import org.apache.nutch.api.JobStatus.State;
+import java.util.Collection;
+
+import org.apache.nutch.api.model.request.JobConfig;
+import org.apache.nutch.api.model.response.JobInfo;
+import org.apache.nutch.api.model.response.JobInfo.State;
public interface JobManager {
-
- public static enum JobType {INJECT, GENERATE, FETCH, PARSE, UPDATEDB, INDEX, READDB, CLASS};
- public List<JobStatus> list(String crawlId, State state) throws Exception;
-
- public JobStatus get(String crawlId, String id) throws Exception;
-
- public String create(String crawlId, JobType type, String confId,
- Map<String,Object> args) throws Exception;
-
- public boolean abort(String crawlId, String id) throws Exception;
-
- public boolean stop(String crawlId, String id) throws Exception;
+ public static enum JobType {
+ INJECT, GENERATE, FETCH, PARSE, UPDATEDB, INDEX, READDB, CLASS
+ };
+
+ public Collection<JobInfo> list(String crawlId, State state);
+
+ public JobInfo get(String crawlId, String id);
+
+ public String create(JobConfig jobConfig);
+
+ public boolean abort(String crawlId, String id);
+
+ public boolean stop(String crawlId, String id);
}
Modified: nutch/branches/2.x/src/java/org/apache/nutch/api/NutchServer.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/NutchServer.java?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/NutchServer.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/NutchServer.java Thu Jun 26 00:44:43 2014
@@ -16,9 +16,14 @@
******************************************************************************/
package org.apache.nutch.api;
-import java.util.List;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
+import javax.ws.rs.core.Application;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -26,33 +31,52 @@ import org.apache.commons.cli.OptionBuil
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang.StringUtils;
-import org.apache.nutch.api.JobStatus.State;
+import org.apache.nutch.api.impl.JobFactory;
+import org.apache.nutch.api.impl.NutchServerPoolExecutor;
+import org.apache.nutch.api.impl.RAMConfManager;
+import org.apache.nutch.api.impl.RAMJobManager;
+import org.apache.nutch.api.misc.ErrorStatusService;
+import org.apache.nutch.api.model.response.JobInfo;
+import org.apache.nutch.api.model.response.JobInfo.State;
+import org.apache.nutch.api.resources.AdminResource;
+import org.apache.nutch.api.resources.ConfigResource;
+import org.apache.nutch.api.resources.DbResource;
+import org.apache.nutch.api.resources.JobResource;
import org.restlet.Component;
+import org.restlet.Context;
import org.restlet.data.Protocol;
import org.restlet.data.Reference;
-import org.restlet.representation.Representation;
+import org.restlet.ext.jaxrs.JaxRsApplication;
import org.restlet.resource.ClientResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NutchServer {
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+
+public class NutchServer extends Application {
+ public static final String NUTCH_SERVER = "NUTCH_SERVER";
+
private static final Logger LOG = LoggerFactory.getLogger(NutchServer.class);
private static final String LOCALHOST = "localhost";
private static final String DEFAULT_LOG_LEVEL = "INFO";
private static final Integer DEFAULT_PORT = 8081;
-
+ private static final int JOB_CAPACITY = 100;
+
private static String logLevel = DEFAULT_LOG_LEVEL;
private static Integer port = DEFAULT_PORT;
-
+
private static final String CMD_HELP = "help";
private static final String CMD_STOP = "stop";
private static final String CMD_PORT = "port";
private static final String CMD_LOG_LEVEL = "log";
-
private Component component;
- private NutchApp app;
+ private ConfManager configManager;
+ private JobManager jobMgr;
+ private long started;
+
private boolean running;
/**
@@ -63,6 +87,13 @@ public class NutchServer {
* level.
*/
public NutchServer() {
+ configManager = new RAMConfManager();
+ BlockingQueue<Runnable> runnables = Queues
+ .newArrayBlockingQueue(JOB_CAPACITY);
+ NutchServerPoolExecutor executor = new NutchServerPoolExecutor(10,
+ JOB_CAPACITY, 1, TimeUnit.HOURS, runnables);
+ jobMgr = new RAMJobManager(new JobFactory(), executor, configManager);
+
// Create a new Component.
component = new Component();
component.getLogger().setLevel(Level.parse(logLevel));
@@ -70,11 +101,36 @@ public class NutchServer {
// Add a new HTTP server listening on defined port.
component.getServers().add(Protocol.HTTP, port);
+ Context childContext = component.getContext().createChildContext();
+ JaxRsApplication application = new JaxRsApplication(childContext);
+ application.add(this);
+ application.setStatusService(new ErrorStatusService());
+ childContext.getAttributes().put(NUTCH_SERVER, this);
+
// Attach the application.
- app = new NutchApp();
- component.getDefaultHost().attach("/nutch", app);
+ component.getDefaultHost().attach(application);
+ }
+
+ @Override
+ public Set<Class<?>> getClasses() {
+ Set<Class<?>> resources = Sets.newHashSet();
+ resources.add(JobResource.class);
+ resources.add(AdminResource.class);
+ resources.add(ConfigResource.class);
+ resources.add(DbResource.class);
+ return resources;
+ }
+
+ public ConfManager getConfMgr() {
+ return configManager;
+ }
+
+ public JobManager getJobMgr() {
+ return jobMgr;
+ }
- NutchApp.server = this;
+ public long getStarted() {
+ return started;
}
/**
@@ -89,29 +145,38 @@ public class NutchServer {
/**
* Starts the Nutch server printing some logging to the log file.
*
- * @throws Exception
*/
- public void start() throws Exception {
+ public void start() {
LOG.info("Starting NutchServer on port: {} with logging level: {} ...",
port, logLevel);
- component.start();
+ try {
+ component.start();
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot start server!", e);
+ }
LOG.info("Started NutchServer on port {}", port);
running = true;
- NutchApp.started = System.currentTimeMillis();
+ started = System.currentTimeMillis();
}
/**
* Safety and convenience method to determine whether or not it is safe to
* shut down the server. We make this assertion by consulting the
- * {@link org.apache.nutch.api.NutchApp#jobMgr} for a list of jobs with
- * {@link org.apache.nutch.api.JobStatus#state} equal to 'RUNNING'.
+ * {@link org.apache.nutch.api.NutchApp#jobManager} for a list of jobs with
+ * {@link org.apache.nutch.api.model.response.JobInfo#state} equal to 'RUNNING'.
+ *
+ * @param force
+ * ignore running tasks
*
* @return true if there are no jobs running or false if there are jobs with
* running state.
- * @throws Exception
*/
- public boolean canStop() throws Exception {
- List<JobStatus> jobs = NutchApp.jobMgr.list(null, State.RUNNING);
+ public boolean canStop(boolean force) {
+ if (force) {
+ return true;
+ }
+
+ Collection<JobInfo> jobs = getJobMgr().list(null, State.RUNNING);
return jobs.isEmpty();
}
@@ -123,18 +188,22 @@ public class NutchServer {
* @return true if no server is running or if the shutdown was successful.
* Return false if there are running jobs and the force switch has not
* been activated.
- * @throws Exception
*/
- public boolean stop(boolean force) throws Exception {
- if (!NutchApp.server.running) {
+ public boolean stop(boolean force) {
+ if (!running) {
return true;
}
- if (!NutchApp.server.canStop() && !force) {
+ if (!canStop(force)) {
LOG.warn("Running jobs - can't stop now.");
return false;
}
+
LOG.info("Stopping NutchServer on port {}...", port);
- component.stop();
+ try {
+ component.stop();
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot stop nutch server", e);
+ }
LOG.info("Stopped NutchServer on port {}", port);
running = false;
return true;
@@ -150,53 +219,54 @@ public class NutchServer {
formatter.printHelp("NutchServer", options, true);
return;
}
-
+
if (commandLine.hasOption(CMD_LOG_LEVEL)) {
logLevel = commandLine.getOptionValue(CMD_LOG_LEVEL);
}
-
+
if (commandLine.hasOption(CMD_PORT)) {
port = Integer.parseInt(commandLine.getOptionValue(CMD_PORT));
}
if (commandLine.hasOption(CMD_STOP)) {
String stopParameter = commandLine.getOptionValue(CMD_STOP);
- boolean force = StringUtils.equals(Params.FORCE, stopParameter);
+ boolean force = StringUtils.equals("force", stopParameter);
stopRemoteServer(force);
return;
}
-
+
startServer();
}
-
- private static void startServer() throws Exception {
+
+ private static void startServer() {
NutchServer server = new NutchServer();
server.start();
}
-
- private static void stopRemoteServer(boolean force) throws Exception {
+
+ private static void stopRemoteServer(boolean force) {
Reference reference = new Reference(Protocol.HTTP, LOCALHOST, port);
- reference.setPath("/nutch/admin/stop");
-
+ reference.setPath("/admin/stop");
+
if (force) {
- reference.addQueryParameter(Params.FORCE, Params.TRUE);
+ reference.addQueryParameter("force", "true");
}
-
+
ClientResource clientResource = new ClientResource(reference);
- Representation response = clientResource.get();
- LOG.info("Server response: {} ", response.getText());
+ clientResource.get();
}
private static Options createOptions() {
Options options = new Options();
OptionBuilder.hasArg();
OptionBuilder.withArgName("logging level");
- OptionBuilder.withDescription("Select a logging level for the NutchServer: \n"
- + "ALL|CONFIG|FINER|FINEST|INFO|OFF|SEVERE|WARNING");
+ OptionBuilder
+ .withDescription("Select a logging level for the NutchServer: \n"
+ + "ALL|CONFIG|FINER|FINEST|INFO|OFF|SEVERE|WARNING");
options.addOption(OptionBuilder.create(CMD_LOG_LEVEL));
- OptionBuilder.withDescription("Stop running NutchServer. "
- + "true value forces the Server to stop despite running jobs e.g. kills the tasks ");
+ OptionBuilder
+ .withDescription("Stop running NutchServer. "
+ + "true value forces the Server to stop despite running jobs e.g. kills the tasks ");
OptionBuilder.hasOptionalArg();
OptionBuilder.withArgName("force");
options.addOption(OptionBuilder.create(CMD_STOP));
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobFactory.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobFactory.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobFactory.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobFactory.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.crawl.DbUpdaterJob;
+import org.apache.nutch.crawl.GeneratorJob;
+import org.apache.nutch.crawl.InjectorJob;
+import org.apache.nutch.crawl.WebTableReader;
+import org.apache.nutch.fetcher.FetcherJob;
+import org.apache.nutch.indexer.IndexingJob;
+import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.util.NutchTool;
+
+import com.google.common.collect.Maps;
+
+public class JobFactory {
+ private static Map<JobType, Class<? extends NutchTool>> typeToClass;
+
+ static {
+ typeToClass = Maps.newHashMap();
+ typeToClass.put(JobType.FETCH, FetcherJob.class);
+ typeToClass.put(JobType.GENERATE, GeneratorJob.class);
+ typeToClass.put(JobType.INDEX, IndexingJob.class);
+ typeToClass.put(JobType.INJECT, InjectorJob.class);
+ typeToClass.put(JobType.PARSE, ParserJob.class);
+ typeToClass.put(JobType.UPDATEDB, DbUpdaterJob.class);
+ typeToClass.put(JobType.READDB, WebTableReader.class);
+ }
+
+ public NutchTool createToolByType(JobType type, Configuration conf) {
+ if (!typeToClass.containsKey(type)) {
+ return null;
+ }
+ Class<? extends NutchTool> clz = typeToClass.get(type);
+ return createTool(clz, conf);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public NutchTool createToolByClassName(String className, Configuration conf) {
+ try {
+ Class clz = Class.forName(className);
+ return createTool(clz, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private NutchTool createTool(Class<? extends NutchTool> clz,
+ Configuration conf) {
+ return ReflectionUtils.newInstance(clz, conf);
+ }
+
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobWorker.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobWorker.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobWorker.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/JobWorker.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl;
+
+import java.text.MessageFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.api.model.request.JobConfig;
+import org.apache.nutch.api.model.response.JobInfo;
+import org.apache.nutch.api.model.response.JobInfo.State;
+import org.apache.nutch.api.resources.ConfigResource;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.NutchTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobWorker implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
+ private NutchTool tool;
+ private JobInfo jobInfo;
+ private JobConfig jobConfig;
+
+ public JobWorker(JobConfig jobConfig, Configuration conf, NutchTool tool) {
+ this.tool = tool;
+ this.jobConfig = jobConfig;
+ if (jobConfig.getConfId() == null) {
+ jobConfig.setConfId(ConfigResource.DEFAULT);
+ }
+
+ jobInfo = new JobInfo(generateId(), jobConfig, State.IDLE, "idle");
+ if (jobConfig.getCrawlId() != null) {
+ conf.set(Nutch.CRAWL_ID_KEY, jobConfig.getCrawlId());
+ }
+ }
+
+ private String generateId() {
+ if (jobConfig.getCrawlId() == null) {
+ return MessageFormat.format("{0}-{1}-{2}", jobConfig.getConfId(),
+ jobConfig.getType(), String.valueOf(hashCode()));
+ }
+ return MessageFormat.format("{0}-{1}-{2}-{3}", jobConfig.getCrawlId(),
+ jobConfig.getConfId(), jobConfig.getType(), String.valueOf(hashCode()));
+ }
+
+ @Override
+ public void run() {
+ try {
+ getInfo().setState(State.RUNNING);
+ getInfo().setMsg("OK");
+ getInfo().setResult(tool.run(getInfo().getArgs()));
+ getInfo().setState(State.FINISHED);
+ } catch (Exception e) {
+ LOG.error("Cannot run job worker!", e);
+ getInfo().setMsg("ERROR: " + e.toString());
+ getInfo().setState(State.FAILED);
+ }
+ }
+
+ public boolean stopJob() {
+ getInfo().setState(State.STOPPING);
+ try {
+ return tool.stopJob();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot stop job with id " + getInfo().getId(), e);
+ }
+ }
+
+ public boolean killJob() {
+ getInfo().setState(State.KILLING);
+ try {
+ boolean result = tool.killJob();
+ getInfo().setState(State.KILLED);
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot kill job with id " + getInfo().getId(), e);
+ }
+ }
+
+ public JobInfo getInfo() {
+ return jobInfo;
+ }
+
+ public void setInfo(JobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+}
\ No newline at end of file
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/NutchServerPoolExecutor.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/NutchServerPoolExecutor.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/NutchServerPoolExecutor.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/NutchServerPoolExecutor.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,103 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nutch.api.model.response.JobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+
+public class NutchServerPoolExecutor extends ThreadPoolExecutor {
+
+ private Queue<JobWorker> workersHistory;
+ private Queue<JobWorker> runningWorkers;
+
+ public NutchServerPoolExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ workersHistory = Queues.newArrayBlockingQueue(maximumPoolSize);
+ runningWorkers = Queues.newArrayBlockingQueue(maximumPoolSize);
+ }
+
+ @Override
+ protected void beforeExecute(Thread thread, Runnable runnable) {
+ super.beforeExecute(thread, runnable);
+ synchronized (runningWorkers) {
+ runningWorkers.offer(((JobWorker) runnable));
+ }
+ }
+
+ @Override
+ protected void afterExecute(Runnable runnable, Throwable throwable) {
+ super.afterExecute(runnable, throwable);
+ synchronized (runningWorkers) {
+ runningWorkers.remove(((JobWorker) runnable).getInfo());
+ }
+ JobWorker worker = ((JobWorker) runnable);
+ addStatusToHistory(worker);
+ }
+
+ private void addStatusToHistory(JobWorker worker) {
+ synchronized (workersHistory) {
+ if (!workersHistory.offer(worker)) {
+ workersHistory.poll();
+ workersHistory.add(worker);
+ }
+ }
+ }
+
+ public JobWorker findWorker(String jobId) {
+ synchronized (runningWorkers) {
+ for (JobWorker worker : runningWorkers) {
+ if (StringUtils.equals(worker.getInfo().getId(), jobId)) {
+ return worker;
+ }
+ }
+ }
+ return null;
+ }
+
+ public Collection<JobInfo> getJobHistory() {
+ return getJobsInfo(workersHistory);
+ }
+
+ public Collection<JobInfo> getJobRunning() {
+ return getJobsInfo(runningWorkers);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Collection<JobInfo> getAllJobs() {
+ return CollectionUtils.union(getJobRunning(), getJobHistory());
+ }
+
+ private Collection<JobInfo> getJobsInfo(Collection<JobWorker> workers) {
+ List<JobInfo> jobsInfo = Lists.newLinkedList();
+ for (JobWorker worker : workers) {
+ jobsInfo.add(worker.getInfo());
+ }
+ return jobsInfo;
+ }
+}
Modified: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMConfManager.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMConfManager.java?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMConfManager.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMConfManager.java Thu Jun 26 00:44:43 2014
@@ -16,68 +16,104 @@
******************************************************************************/
package org.apache.nutch.api.impl;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-import java.util.TreeMap;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nutch.api.ConfManager;
-import org.apache.nutch.api.ConfResource;
+import org.apache.nutch.api.model.request.NutchConfig;
+import org.apache.nutch.api.resources.ConfigResource;
import org.apache.nutch.util.NutchConfiguration;
+import com.google.common.collect.Maps;
+
public class RAMConfManager implements ConfManager {
- Map<String,Configuration> configs = new HashMap<String,Configuration>();
-
+ private Map<String, Configuration> configurations = Maps.newConcurrentMap();
+
+ private AtomicInteger newConfigId = new AtomicInteger();
+
public RAMConfManager() {
- configs.put(ConfResource.DEFAULT_CONF, NutchConfiguration.create());
+ configurations.put(ConfigResource.DEFAULT, NutchConfiguration.create());
}
-
+
public Set<String> list() {
- return configs.keySet();
+ return configurations.keySet();
}
-
+
public Configuration get(String confId) {
- return configs.get(confId);
+ if (confId == null) {
+ return configurations.get(ConfigResource.DEFAULT);
+ }
+ return configurations.get(confId);
}
-
- public Map<String,String> getAsMap(String confId) {
- Configuration cfg = configs.get(confId);
- if (cfg == null) return null;
- Iterator<Entry<String,String>> it = cfg.iterator();
- TreeMap<String,String> res = new TreeMap<String,String>();
- while (it.hasNext()) {
- Entry<String,String> e = it.next();
- res.put(e.getKey(), e.getValue());
- }
- return res;
- }
-
- public void create(String confId, Map<String,String> props, boolean force) throws Exception {
- if (configs.containsKey(confId) && !force) {
- throw new Exception("Config name '" + confId + "' already exists.");
+
+ public Map<String, String> getAsMap(String confId) {
+ Configuration configuration = configurations.get(confId);
+ if (configuration == null) {
+ return Collections.emptyMap();
}
- Configuration conf = NutchConfiguration.create();
- // apply overrides
- if (props != null) {
- for (Entry<String,String> e : props.entrySet()) {
- conf.set(e.getKey(), e.getValue());
- }
- }
- configs.put(confId, conf);
- }
-
- public void setProperty(String confId, String propName, String propValue) throws Exception {
- if (!configs.containsKey(confId)) {
- throw new Exception("Unknown configId '" + confId + "'");
+
+ Iterator<Entry<String, String>> iterator = configuration.iterator();
+ Map<String, String> configMap = Maps.newTreeMap();
+ while (iterator.hasNext()) {
+ Entry<String, String> entry = iterator.next();
+ configMap.put(entry.getKey(), entry.getValue());
+ }
+ return configMap;
+ }
+
+ public void setProperty(String confId, String propName, String propValue) {
+ if (!configurations.containsKey(confId)) {
+ throw new IllegalArgumentException("Unknown configId '" + confId + "'");
}
- Configuration conf = configs.get(confId);
+ Configuration conf = configurations.get(confId);
conf.set(propName, propValue);
}
-
+
public void delete(String confId) {
- configs.remove(confId);
+ configurations.remove(confId);
}
+
+ @Override
+ public String create(NutchConfig nutchConfig) {
+ if (StringUtils.isBlank(nutchConfig.getConfigId())) {
+ nutchConfig.setConfigId(String.valueOf(newConfigId.incrementAndGet()));
+ }
+
+ if (!canCreate(nutchConfig)) {
+ throw new IllegalArgumentException("Config already exists.");
+ }
+
+ createHadoopConfig(nutchConfig);
+ return nutchConfig.getConfigId();
+ }
+
+ private boolean canCreate(NutchConfig nutchConfig) {
+ if (nutchConfig.isForce()) {
+ return true;
+ }
+ if (!configurations.containsKey(nutchConfig.getConfigId())) {
+ return true;
+ }
+ return false;
+ }
+
+ private void createHadoopConfig(NutchConfig nutchConfig) {
+ Configuration conf = NutchConfiguration.create();
+ configurations.put(nutchConfig.getConfigId(), conf);
+
+ if (MapUtils.isEmpty(nutchConfig.getParams())) {
+ return;
+ }
+ for (Entry<String, String> e : nutchConfig.getParams().entrySet()) {
+ conf.set(e.getKey(), e.getValue());
+ }
+ }
+
}
Modified: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMJobManager.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMJobManager.java?rev=1605644&r1=1605643&r2=1605644&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMJobManager.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/RAMJobManager.java Thu Jun 26 00:44:43 2014
@@ -16,237 +16,83 @@
******************************************************************************/
package org.apache.nutch.api.impl;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.Collection;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.nutch.api.ConfResource;
+import org.apache.nutch.api.ConfManager;
import org.apache.nutch.api.JobManager;
-import org.apache.nutch.api.JobStatus;
-import org.apache.nutch.api.JobStatus.State;
-import org.apache.nutch.api.NutchApp;
-import org.apache.nutch.crawl.DbUpdaterJob;
-import org.apache.nutch.crawl.GeneratorJob;
-import org.apache.nutch.crawl.InjectorJob;
-import org.apache.nutch.crawl.WebTableReader;
-import org.apache.nutch.fetcher.FetcherJob;
-import org.apache.nutch.indexer.IndexingJob;
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.api.model.request.JobConfig;
+import org.apache.nutch.api.model.response.JobInfo;
+import org.apache.nutch.api.model.response.JobInfo.State;
import org.apache.nutch.util.NutchTool;
public class RAMJobManager implements JobManager {
- int CAPACITY = 100;
- ThreadPoolExecutor exec = new MyPoolExecutor(10, CAPACITY, 1, TimeUnit.HOURS,
- new ArrayBlockingQueue<Runnable>(CAPACITY));
-
- private class MyPoolExecutor extends ThreadPoolExecutor {
-
- public MyPoolExecutor(int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
-
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- // TODO Auto-generated method stub
- super.beforeExecute(t, r);
- synchronized(jobRunning) {
- jobRunning.offer(((JobWorker)r).jobStatus);
- }
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- synchronized(jobRunning) {
- jobRunning.remove(((JobWorker)r).jobStatus);
- }
- JobStatus status = ((JobWorker)r).jobStatus;
- synchronized(jobHistory) {
- if (!jobHistory.offer(status)) {
- jobHistory.poll();
- jobHistory.add(status);
- }
- }
- }
- }
-
- ArrayBlockingQueue<JobStatus> jobHistory = new ArrayBlockingQueue<JobStatus>(CAPACITY);
- ArrayBlockingQueue<JobStatus> jobRunning = new ArrayBlockingQueue<JobStatus>(CAPACITY);
-
- private static Map<JobType,Class<? extends NutchTool>> typeToClass = new HashMap<JobType,Class<? extends NutchTool>>();
-
- static {
- typeToClass.put(JobType.FETCH, FetcherJob.class);
- typeToClass.put(JobType.GENERATE, GeneratorJob.class);
- typeToClass.put(JobType.INDEX, IndexingJob.class);
- typeToClass.put(JobType.INJECT, InjectorJob.class);
- typeToClass.put(JobType.PARSE, ParserJob.class);
- typeToClass.put(JobType.UPDATEDB, DbUpdaterJob.class);
- typeToClass.put(JobType.READDB, WebTableReader.class);
- }
-
- private void addFinishedStatus(JobStatus status) {
- synchronized(jobHistory) {
- if (!jobHistory.offer(status)) {
- jobHistory.poll();
- jobHistory.add(status);
- }
- }
- }
-
- @Override
- @SuppressWarnings("fallthrough")
- public List<JobStatus> list(String crawlId, State state) throws Exception {
- List<JobStatus> res = new ArrayList<JobStatus>();
- if (state == null) state = State.ANY;
- switch(state) {
- case ANY:
- res.addAll(jobHistory);
- /* FALLTHROUGH */
- case RUNNING:
- case IDLE:
- res.addAll(jobRunning);
- break;
- default:
- res.addAll(jobHistory);
- }
- return res;
+ private JobFactory jobFactory;
+ private NutchServerPoolExecutor executor;
+ private ConfManager configManager;
+
+ public RAMJobManager(JobFactory jobFactory, NutchServerPoolExecutor executor,
+ ConfManager configManager) {
+ this.jobFactory = jobFactory;
+ this.executor = executor;
+ this.configManager = configManager;
}
@Override
- public JobStatus get(String crawlId, String jobId) throws Exception {
- for (JobStatus job : jobRunning) {
- if (job.id.equals(jobId)) {
- return job;
- }
+ public Collection<JobInfo> list(String crawlId, State state) {
+ if (state == null || state == State.ANY) {
+ return executor.getAllJobs();
}
- for (JobStatus job : jobHistory) {
- if (job.id.equals(jobId)) {
- return job;
- }
+ if (state == State.RUNNING || state == State.IDLE) {
+ return executor.getJobRunning();
}
- return null;
+ return executor.getJobHistory();
}
@Override
- public String create(String crawlId, JobType type, String confId,
- Map<String,Object> args) throws Exception {
- if (args == null) args = Collections.emptyMap();
- JobWorker worker = new JobWorker(crawlId, type, confId, args);
- String id = worker.getId();
- exec.execute(worker);
- exec.purge();
- return id;
+ public JobInfo get(String crawlId, String jobId) {
+ return executor.findWorker(jobId).getInfo();
}
@Override
- public boolean abort(String crawlId, String id) throws Exception {
- // find running job
- for (JobStatus job : jobRunning) {
- if (job.id.equals(id)) {
- job.state = State.KILLING;
- boolean res = job.tool.killJob();
- job.state = State.KILLED;
- return res;
- }
+ public String create(JobConfig jobConfig) {
+ if (jobConfig.getArgs() == null) {
+ throw new IllegalArgumentException("Arguments cannot be null!");
}
- return false;
+
+ Configuration conf = cloneConfiguration(jobConfig.getConfId());
+ NutchTool tool = createTool(jobConfig, conf);
+ JobWorker worker = new JobWorker(jobConfig, conf, tool);
+
+ executor.execute(worker);
+ executor.purge();
+ return worker.getInfo().getId();
}
- @Override
- public boolean stop(String crawlId, String id) throws Exception {
- // find running job
- for (JobStatus job : jobRunning) {
- if (job.id.equals(id)) {
- job.state = State.STOPPING;
- boolean res = job.tool.stopJob();
- return res;
- }
+ private Configuration cloneConfiguration(String confId) {
+ Configuration conf = configManager.get(confId);
+ if (conf == null) {
+ throw new IllegalArgumentException("Unknown confId " + confId);
}
- return false;
+ return new Configuration(conf);
}
-
- private class JobWorker implements Runnable {
- String id;
- JobType type;
- String confId;
- NutchTool tool;
- Map<String,Object> args;
- JobStatus jobStatus;
-
- @SuppressWarnings("unchecked")
- JobWorker(String crawlId, JobType type, String confId, Map<String,Object> args) throws Exception {
- if (confId == null) {
- confId = ConfResource.DEFAULT_CONF;
- }
- Configuration conf = NutchApp.confMgr.get(confId);
- // clone it - we are going to modify it
- if (conf == null) {
- throw new Exception("Unknown confId " + confId);
- }
- this.id = confId + "-" + type + "-" + hashCode();
- this.type = type;
- this.confId = confId;
- this.args = args;
- conf = new Configuration(conf);
- if (crawlId != null) {
- conf.set(Nutch.CRAWL_ID_KEY, crawlId);
- this.id = crawlId + "-" + this.id;
- }
- Class<? extends NutchTool> clz = typeToClass.get(type);
- if (clz == null) {
- Class<?> c = Class.forName((String)args.get(Nutch.ARG_CLASS));
- if(c instanceof Class) {
- clz = (Class<? extends NutchTool>) c;
- }
- }
- tool = ReflectionUtils.newInstance(clz, conf);
- jobStatus = new JobStatus(id, type, confId, args, State.IDLE, "idle");
- jobStatus.tool = tool;
- }
-
- public String getId() {
- return id;
- }
-
- public float getProgress() {
- return tool.getProgress();
- }
-
- public State getState() {
- return jobStatus.state;
- }
-
- public Map<String,Object> getResult() {
- return jobStatus.result;
+
+ private NutchTool createTool(JobConfig jobConfig, Configuration conf) {
+ if (StringUtils.isNotBlank(jobConfig.getJobClassName())) {
+ return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf);
}
- public Map<String,Object> getStatus() {
- return tool.getStatus();
- }
+ return jobFactory.createToolByType(jobConfig.getType(), conf);
+ }
- @Override
- public void run() {
- try {
- jobStatus.state = State.RUNNING;
- jobStatus.msg = "OK";
- jobStatus.result = tool.run(args);
- jobStatus.state = State.FINISHED;
- } catch (Exception e) {
- e.printStackTrace();
- jobStatus.msg = "ERROR: " + e.toString();
- jobStatus.state = State.FAILED;
- }
- }
+ @Override
+ public boolean abort(String crawlId, String id) {
+ return executor.findWorker(id).killJob();
+ }
+
+ @Override
+ public boolean stop(String crawlId, String id) {
+ return executor.findWorker(id).stopJob();
}
}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbIterator.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbIterator.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbIterator.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbIterator.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl.db;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.gora.query.Result;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class DbIterator extends UnmodifiableIterator<Map<String, Object>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DbIterator.class);
+
+ private Result<String, WebPage> result;
+ private boolean hasNext;
+ private String url;
+ private WebPage page;
+ private Utf8 batchId;
+ private Set<String> commonFields;
+
+ DbIterator(Result<String, WebPage> res, Set<String> fields, String batchId) {
+ this.result = res;
+ if (batchId != null) {
+ this.batchId = new Utf8(batchId);
+ }
+ if (fields != null) {
+ this.commonFields = Sets.newTreeSet(fields);
+ }
+ try {
+ skipNonRelevant();
+ } catch (Exception e) {
+ LOG.error("Cannot create db iterator!", e);
+ }
+ }
+
+ private void skipNonRelevant() throws Exception, IOException {
+ hasNext = result.next();
+ if (!hasNext) {
+ return;
+ }
+ if (batchId == null) {
+ return;
+ }
+
+ while (hasNext) {
+ WebPage page = result.get();
+ Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
+ if (NutchJob.shouldProcess(mark, batchId)) {
+ return;
+ }
+
+ LOG.debug("Skipping {}; different batch id", result.getKey());
+ hasNext = result.next();
+ }
+ }
+
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ public Map<String, Object> next() {
+ url = result.getKey();
+ page = WebPage.newBuilder(result.get()).build();
+ try {
+ skipNonRelevant();
+ if (!hasNext) {
+ result.close();
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot get next result!", e);
+ hasNext = false;
+ return null;
+ }
+ return pageAsMap(url, page);
+ }
+
+ private Map<String, Object> pageAsMap(String url, WebPage page) {
+ Map<String, Object> result = DbPageConverter.convertPage(page, commonFields);
+
+ if (CollectionUtils.isEmpty(commonFields) || commonFields.contains("url")) {
+ result.put("url", TableUtil.unreverseUrl(url));
+ }
+ return result;
+ }
+
+}
\ No newline at end of file
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbPageConverter.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbPageConverter.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbPageConverter.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbPageConverter.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,135 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl.db;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.avro.Schema.Field;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nutch.parse.ParseStatusUtils;
+import org.apache.nutch.protocol.ProtocolStatusUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.Bytes;
+import org.apache.nutch.util.StringUtil;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+public class DbPageConverter {
+
+ public static Map<String, Object> convertPage(WebPage page, Set<String> fields) {
+ Map<String, Object> result = Maps.newHashMap();
+ for (Field field : filterFields(page, fields)) {
+ Object value = convertField(page, field);
+ if (value != null) {
+ result.put(field.name(), value);
+ }
+ }
+ return result;
+ }
+
+ private static Object convertField(WebPage page, Field field) {
+ int index = field.pos();
+ if (index < 0) {
+ return null;
+ }
+
+ Object value = page.get(index);
+ if (value == null) {
+ return null;
+ }
+
+ String fieldName = field.name();
+ if (StringUtils.equals(fieldName, "metadata")) {
+ return getSimpleMetadata(page);
+ }
+ if (StringUtils.equals(fieldName, "protocolStatus")) {
+ return ProtocolStatusUtils.toString(page.getProtocolStatus());
+ }
+ if (StringUtils.equals(fieldName, "parseStatus")) {
+ return ParseStatusUtils.toString(page.getParseStatus());
+ }
+ if (StringUtils.equals(fieldName, "signature")) {
+ return StringUtil.toHexString(page.getSignature());
+ }
+ if (StringUtils.equals(fieldName, "content")) {
+ return Bytes.toStringBinary(page.getContent());
+ }
+ if (StringUtils.equals(fieldName, "markers")) {
+ return convertToStringsMap(page.getMarkers());
+ }
+ if (StringUtils.equals(fieldName, "inlinks")) {
+ return convertToStringsMap(page.getInlinks());
+ }
+ if (StringUtils.equals(fieldName, "outlinks")) {
+ return convertToStringsMap(page.getOutlinks());
+ }
+
+ if (value instanceof Utf8) {
+ return value.toString();
+ }
+
+ if (value instanceof ByteBuffer) {
+ return Bytes.toStringBinary((ByteBuffer) value);
+ }
+
+ return value;
+ }
+
+ private static Set<Field> filterFields(WebPage page, Set<String> queryFields) {
+ List<Field> pageFields = page.getSchema().getFields();
+ if (CollectionUtils.isEmpty(queryFields)) {
+ return Sets.newHashSet(pageFields);
+ }
+
+ Set<Field> filteredFields = Sets.newLinkedHashSet();
+ for (Field field : pageFields) {
+ if (queryFields.contains(field.name())) {
+ filteredFields.add(field);
+ }
+ }
+ return filteredFields;
+ }
+
+ private static Map<String, String> getSimpleMetadata(WebPage page) {
+ Map<CharSequence, ByteBuffer> metadata = page.getMetadata();
+ if (MapUtils.isEmpty(metadata)) {
+ return Collections.emptyMap();
+ }
+ Map<String, String> simpleMeta = Maps.newHashMap();
+ for (CharSequence key : metadata.keySet()) {
+ simpleMeta.put(key.toString(), Bytes.toStringBinary(metadata.get(key)));
+ }
+ return simpleMeta;
+ }
+
+ private static Map<String, String> convertToStringsMap(Map<?, ?> map) {
+ Map<String, String> res = Maps.newHashMap();
+ for (Entry<?, ?> entry : map.entrySet()) {
+ res.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return res;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbReader.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbReader.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbReader.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/impl/db/DbReader.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.impl.db;
+
+import java.net.MalformedURLException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.api.model.request.DbFilter;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.TableUtil;
+
+public class DbReader {
+ private DataStore<String, WebPage> store;
+
+ public DbReader(Configuration conf, String crawlId) {
+ conf = new Configuration(conf);
+ if (crawlId != null) {
+ conf.set(Nutch.CRAWL_ID_KEY, crawlId);
+ }
+ try {
+ store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
+ } catch (Exception e) {
+ throw new IllegalStateException("Cannot create webstore!", e);
+ }
+ }
+
+ public Iterator<Map<String, Object>> runQuery(DbFilter filter) {
+ String startKey = filter.getStartKey();
+ String endKey = filter.getEndKey();
+
+ if (!filter.isKeysReversed()) {
+ startKey = reverseKey(filter.getStartKey());
+ endKey = reverseKey(filter.getEndKey());
+ }
+
+ Query<String, WebPage> query = store.newQuery();
+ query.setFields(prepareFields(filter.getFields()));
+ if (startKey != null) {
+ query.setStartKey(startKey);
+ if (endKey != null) {
+ query.setEndKey(endKey);
+ }
+ }
+ Result<String, WebPage> result = store.execute(query);
+ return new DbIterator(result, filter.getFields(), filter.getBatchId());
+ }
+
+ private String reverseKey(String key) {
+ if (StringUtils.isEmpty(key)) {
+ return null;
+ }
+
+ try {
+ return TableUtil.reverseUrl(key);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Wrong url format!", e);
+ }
+ }
+
+ private String[] prepareFields(Set<String> fields) {
+ if (CollectionUtils.isEmpty(fields)) {
+ return null;
+ }
+ fields.remove("url");
+ return fields.toArray(new String[fields.size()]);
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/misc/ErrorStatusService.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/misc/ErrorStatusService.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/misc/ErrorStatusService.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/misc/ErrorStatusService.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,40 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.misc;
+
+import org.apache.nutch.api.model.response.ErrorResponse;
+import org.restlet.Request;
+import org.restlet.Response;
+import org.restlet.data.Status;
+import org.restlet.ext.jackson.JacksonRepresentation;
+import org.restlet.representation.Representation;
+import org.restlet.service.StatusService;
+
+public class ErrorStatusService extends StatusService {
+ @Override
+ public Status getStatus(Throwable throwable, Request request,
+ Response response) {
+ return new Status(Status.SERVER_ERROR_INTERNAL, throwable);
+ }
+
+ @Override
+ public Representation getRepresentation(Status status, Request request,
+ Response response) {
+ ErrorResponse errorResponse = new ErrorResponse(status.getThrowable());
+ return new JacksonRepresentation<ErrorResponse>(errorResponse);
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/DbFilter.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/DbFilter.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/DbFilter.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/DbFilter.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,67 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.request;
+
+import java.util.Set;
+
+public class DbFilter {
+ private String batchId;
+ private String startKey;
+ private String endKey;
+ private boolean isKeysReversed = false;
+ private Set<String> fields;
+
+ public Set<String> getFields() {
+ return fields;
+ }
+
+ public void setFields(Set<String> fields) {
+ this.fields = fields;
+ }
+
+ public boolean isKeysReversed() {
+ return isKeysReversed;
+ }
+
+ public void setKeysReversed(boolean isKeysReversed) {
+ this.isKeysReversed = isKeysReversed;
+ }
+
+ public String getEndKey() {
+ return endKey;
+ }
+
+ public void setEndKey(String endKey) {
+ this.endKey = endKey;
+ }
+
+ public String getStartKey() {
+ return startKey;
+ }
+
+ public void setStartKey(String startKey) {
+ this.startKey = startKey;
+ }
+
+ public String getBatchId() {
+ return batchId;
+ }
+
+ public void setBatchId(String batchId) {
+ this.batchId = batchId;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/JobConfig.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/JobConfig.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/JobConfig.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/JobConfig.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.request;
+
+import java.util.Map;
+
+import org.apache.nutch.api.JobManager.JobType;
+
+public class JobConfig {
+ private String crawlId;
+ private JobType type;
+ private String confId;
+ private String jobClassName;
+ private Map<String, Object> args;
+
+ public String getCrawlId() {
+ return crawlId;
+ }
+
+ public void setCrawlId(String crawlId) {
+ this.crawlId = crawlId;
+ }
+
+ public JobType getType() {
+ return type;
+ }
+
+ public void setType(JobType type) {
+ this.type = type;
+ }
+
+ public String getConfId() {
+ return confId;
+ }
+
+ public void setConfId(String confId) {
+ this.confId = confId;
+ }
+
+ public Map<String, Object> getArgs() {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args) {
+ this.args = args;
+ }
+
+ public String getJobClassName() {
+ return jobClassName;
+ }
+
+ public void setJobClassName(String jobClass) {
+ this.jobClassName = jobClass;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/NutchConfig.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/NutchConfig.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/NutchConfig.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/request/NutchConfig.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.request;
+
+import java.util.Map;
+
+import java.util.Collections;
+
+public class NutchConfig {
+ private String configId;
+ private boolean force = false;
+ private Map<String, String> params = Collections.emptyMap();
+
+ public Map<String, String> getParams() {
+ return params;
+ }
+
+ public void setParams(Map<String, String> params) {
+ this.params = params;
+ }
+
+ public String getConfigId() {
+ return configId;
+ }
+
+ public void setConfigId(String configId) {
+ this.configId = configId;
+ }
+
+ public boolean isForce() {
+ return force;
+ }
+
+ public void setForce(boolean force) {
+ this.force = force;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/DbQueryResult.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/DbQueryResult.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/DbQueryResult.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/DbQueryResult.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.response;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+
+public class DbQueryResult {
+ private List<Map<String, Object>> values = Lists.newLinkedList();
+
+ public List<Map<String, Object>> getValues() {
+ return Collections.unmodifiableList(values);
+ }
+
+ public void addValue(Map<String, Object> next) {
+ values.add(next);
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/ErrorResponse.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/ErrorResponse.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/ErrorResponse.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/ErrorResponse.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.response;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+
+public class ErrorResponse {
+ private String exception;
+ private String message;
+ private String stackTrace;
+
+ public ErrorResponse(Throwable throwable) {
+ if (throwable == null) {
+ message = "Unknown error!";
+ return;
+ }
+ exception = throwable.getClass().toString();
+ message = ExceptionUtils.getMessage(throwable);
+ stackTrace = ExceptionUtils.getFullStackTrace(throwable);
+ }
+
+ public String getException() {
+ return exception;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getStackTrace() {
+ return stackTrace;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/JobInfo.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/JobInfo.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/JobInfo.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/JobInfo.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,113 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.response;
+
+import java.util.Map;
+
+import org.apache.nutch.api.JobManager.JobType;
+import org.apache.nutch.api.model.request.JobConfig;
+
+public class JobInfo {
+
+ public static enum State {
+ IDLE, RUNNING, FINISHED, FAILED, KILLED, STOPPING, KILLING, ANY
+ };
+
+ private String id;
+ private JobType type;
+ private String confId;
+ private Map<String, Object> args;
+ private Map<String, Object> result;
+ private State state;
+ private String msg;
+ private String crawlId;
+
+ public JobInfo(String id, JobConfig config, State state, String msg) {
+ this.id = id;
+ this.crawlId = config.getCrawlId();
+ this.type = config.getType();
+ this.confId = config.getConfId();
+ this.args = config.getArgs();
+ this.state = state;
+ this.msg = msg;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public void setMsg(String msg) {
+ this.msg = msg;
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public void setState(State state) {
+ this.state = state;
+ }
+
+ public Map<String, Object> getResult() {
+ return result;
+ }
+
+ public void setResult(Map<String, Object> result) {
+ this.result = result;
+ }
+
+ public Map<String, Object> getArgs() {
+ return args;
+ }
+
+ public void setArgs(Map<String, Object> args) {
+ this.args = args;
+ }
+
+ public String getConfId() {
+ return confId;
+ }
+
+ public void setConfId(String confId) {
+ this.confId = confId;
+ }
+
+ public JobType getType() {
+ return type;
+ }
+
+ public void setType(JobType type) {
+ this.type = type;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getCrawlId() {
+ return crawlId;
+ }
+
+ public void setCrawlId(String crawlId) {
+ this.crawlId = crawlId;
+ }
+
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/NutchStatus.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/NutchStatus.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/NutchStatus.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/model/response/NutchStatus.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.model.response;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Set;
+
+public class NutchStatus {
+
+ private Date startDate;
+ private Set<String> configuration;
+ private Collection<JobInfo> jobs;
+ private Collection<JobInfo> runningJobs;
+
+ public Date getStartDate() {
+ return startDate;
+ }
+
+ public void setStartDate(Date startDate) {
+ this.startDate = startDate;
+ }
+
+ public Set<String> getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Set<String> configuration) {
+ this.configuration = configuration;
+ }
+
+ public Collection<JobInfo> getJobs() {
+ return jobs;
+ }
+
+ public void setJobs(Collection<JobInfo> jobs) {
+ this.jobs = jobs;
+ }
+
+ public Collection<JobInfo> getRunningJobs() {
+ return runningJobs;
+ }
+
+ public void setRunningJobs(Collection<JobInfo> runningJobs) {
+ this.runningJobs = runningJobs;
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AbstractResource.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AbstractResource.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AbstractResource.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AbstractResource.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,48 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.resources;
+
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.nutch.api.ConfManager;
+import org.apache.nutch.api.JobManager;
+import org.apache.nutch.api.NutchServer;
+import org.restlet.Context;
+
+@Produces({ MediaType.APPLICATION_JSON })
+public abstract class AbstractResource {
+
+ protected ConfManager configManager;
+ protected JobManager jobManager;
+ protected NutchServer server;
+
+ public AbstractResource() {
+ server = (NutchServer) Context.getCurrent().getAttributes()
+ .get(NutchServer.NUTCH_SERVER);
+ configManager = server.getConfMgr();
+ jobManager = server.getJobMgr();
+ }
+
+ protected void throwBadRequestException(String message) {
+ throw new WebApplicationException(Response.status(Status.BAD_REQUEST)
+ .entity(message).build());
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AdminResource.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AdminResource.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AdminResource.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/resources/AdminResource.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.resources;
+
+import java.text.MessageFormat;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
+
+import org.apache.nutch.api.model.response.NutchStatus;
+import org.apache.nutch.api.model.response.JobInfo.State;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Path(value = "/admin")
+public class AdminResource extends AbstractResource {
+ private static final int DELAY_SEC = 10;
+ private static final long DELAY_MILLIS = TimeUnit.SECONDS.toMillis(DELAY_SEC);
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AdminResource.class);
+
+ @GET
+ @Path("/")
+ public NutchStatus getNutchStatus() {
+ NutchStatus status = new NutchStatus();
+
+ status.setStartDate(new Date(server.getStarted()));
+ status.setConfiguration(configManager.list());
+ status.setJobs(jobManager.list(null, State.ANY));
+ status.setRunningJobs(jobManager.list(null, State.RUNNING));
+
+ return status;
+ }
+
+ @GET
+ @Path("/stop")
+ public String stop(@QueryParam("force") boolean force) {
+ if (!server.canStop(force)) {
+ LOG.info("Command 'stop' denied due to unfinished jobs");
+ return "Can't stop now. There are jobs running. Try force option.";
+ }
+
+ scheduleServerStop();
+ return MessageFormat.format("Stopping in {0} seconds.", DELAY_SEC);
+ }
+
+ private void scheduleServerStop() {
+ LOG.info("Server shutdown scheduled in {} seconds", DELAY_SEC);
+ Thread thread = new Thread() {
+ public void run() {
+ try {
+ Thread.sleep(DELAY_MILLIS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ server.stop(false);
+ LOG.info("Service stopped.");
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ LOG.info("Service shutting down...");
+ }
+
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/resources/ConfigResource.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/resources/ConfigResource.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/resources/ConfigResource.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/resources/ConfigResource.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.resources;
+
+import java.util.Map;
+import java.util.Set;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.FormParam;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.nutch.api.model.request.NutchConfig;
+
+@Path("/config")
+public class ConfigResource extends AbstractResource {
+ public static final String DEFAULT = "default";
+
+ @GET
+ @Path("/")
+ public Set<String> getConfigs() {
+ return configManager.list();
+ }
+
+ @GET
+ @Path("/{configId}")
+ public Map<String, String> getConfig(@PathParam("configId") String configId) {
+ return configManager.getAsMap(configId);
+ }
+
+ @GET
+ @Path("/{configId}/{propertyId}")
+ public String getProperty(@PathParam("configId") String configId,
+ @PathParam("propertyId") String propertyId) {
+ return configManager.getAsMap(configId).get(propertyId);
+ }
+
+ @DELETE
+ @Path("/{configId}")
+ public void deleteConfig(@PathParam("configId") String configId) {
+ configManager.delete(configId);
+ }
+
+ @POST
+ @Path("/{configId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public String createConfig(NutchConfig newConfig) {
+ if (newConfig == null) {
+ throw new WebApplicationException(Response.status(Status.BAD_REQUEST)
+ .entity("Nutch configuration cannot be empty!").build());
+ }
+ return configManager.create(newConfig);
+ }
+
+ @PUT
+ @Path("/{config}/{property}")
+ public Response update(@PathParam("config") String config,
+ @PathParam("property") String property, @FormParam("value") String value) {
+ if (value == null) {
+ throwBadRequestException("Missing property value!");
+ }
+ configManager.setProperty(config, property, value);
+ return Response.ok().build();
+ }
+}
Added: nutch/branches/2.x/src/java/org/apache/nutch/api/resources/DbResource.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/api/resources/DbResource.java?rev=1605644&view=auto
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/api/resources/DbResource.java (added)
+++ nutch/branches/2.x/src/java/org/apache/nutch/api/resources/DbResource.java Thu Jun 26 00:44:43 2014
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.nutch.api.resources;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.nutch.api.impl.db.DbReader;
+import org.apache.nutch.api.model.request.DbFilter;
+import org.apache.nutch.api.model.response.DbQueryResult;
+
+@Path("/db")
+public class DbResource extends AbstractResource {
+
+ private Map<String, DbReader> readers = new WeakHashMap<String, DbReader>();
+
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public DbQueryResult runQuery(DbFilter filter) {
+ if (filter == null) {
+ throwBadRequestException("Filter cannot be null!");
+ }
+
+ DbQueryResult result = new DbQueryResult();
+ Iterator<Map<String, Object>> iterator = getReader().runQuery(filter);
+ while (iterator.hasNext()) {
+ result.addValue(iterator.next());
+ }
+ return result;
+ }
+
+ private DbReader getReader() {
+ String confId = ConfigResource.DEFAULT;
+ synchronized (readers) {
+ if (!readers.containsKey(confId)) {
+ readers.put(confId, new DbReader(configManager.get(confId), null));
+ }
+ return readers.get(confId);
+ }
+ }
+}