You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2015/04/22 05:50:53 UTC
svn commit: r1675251 - in /nutch/trunk/src/java/org/apache/nutch/service:
impl/JobFactory.java impl/JobManagerImpl.java impl/JobWorker.java
impl/NutchServerPoolExecutor.java model/request/DbQuery.java
resources/DbResource.java
Author: mattmann
Date: Wed Apr 22 03:50:52 2015
New Revision: 1675251
URL: http://svn.apache.org/r1675251
Log:
More remainder of NUTCH-1973: Job Administration end point for the REST service contributed by Sujen Shah <su...@gmail.com>
Added:
nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java
nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java
nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java
nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java
nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java
nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java
Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.service.JobManager.JobType;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.DeduplicationJob;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchTool;
+
+import com.google.common.collect.Maps;
+
+public class JobFactory {
+ private static Map<JobType, Class<? extends NutchTool>> typeToClass;
+
+ static {
+ typeToClass = Maps.newHashMap();
+ typeToClass.put(JobType.INJECT, Injector.class);
+ typeToClass.put(JobType.GENERATE, Generator.class);
+ typeToClass.put(JobType.FETCH, Fetcher.class);
+ typeToClass.put(JobType.PARSE, ParseSegment.class);
+ typeToClass.put(JobType.UPDATEDB, CrawlDb.class);
+ typeToClass.put(JobType.INVERTLINKS, LinkDb.class);
+ typeToClass.put(JobType.DEDUP, DeduplicationJob.class);
+ }
+
+ public NutchTool createToolByType(JobType type, Configuration conf) {
+ if (!typeToClass.containsKey(type)) {
+ return null;
+ }
+ Class<? extends NutchTool> clz = typeToClass.get(type);
+ return createTool(clz, conf);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public NutchTool createToolByClassName(String className, Configuration conf) {
+ try {
+ Class clz = Class.forName(className);
+ return createTool(clz, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private NutchTool createTool(Class<? extends NutchTool> clz,
+ Configuration conf) {
+ return ReflectionUtils.newInstance(clz, conf);
+ }
+
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.JobManager;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.util.NutchTool;
+
+public class JobManagerImpl implements JobManager {
+
+ private JobFactory jobFactory;
+ private NutchServerPoolExecutor executor;
+ private ConfManager configManager;
+
+ public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) {
+ this.jobFactory = jobFactory;
+ this.configManager = configManager;
+ this.executor = executor;
+ }
+
+ @Override
+ public JobInfo create(JobConfig jobConfig) {
+ if (jobConfig.getArgs() == null) {
+ throw new IllegalArgumentException("Arguments cannot be null!");
+ }
+ Configuration conf = cloneConfiguration(jobConfig.getConfId());
+ NutchTool tool = createTool(jobConfig, conf);
+ JobWorker worker = new JobWorker(jobConfig, conf, tool);
+ executor.execute(worker);
+ executor.purge();
+ return worker.getInfo();
+ }
+
+ private Configuration cloneConfiguration(String confId) {
+ Configuration conf = configManager.get(confId);
+ if (conf == null) {
+ throw new IllegalArgumentException("Unknown confId " + confId);
+ }
+ return new Configuration(conf);
+ }
+
+ @Override
+ public Collection<JobInfo> list(String crawlId, State state) {
+ if (state == null || state == State.ANY) {
+ return executor.getAllJobs();
+ }
+ if (state == State.RUNNING || state == State.IDLE) {
+ return executor.getJobRunning();
+ }
+ return executor.getJobHistory();
+ }
+
+ @Override
+ public JobInfo get(String crawlId, String jobId) {
+ return executor.getInfo(jobId);
+ }
+
+ @Override
+ public boolean abort(String crawlId, String id) {
+ return executor.findWorker(id).killJob();
+ }
+
+ @Override
+ public boolean stop(String crawlId, String id) {
+ return executor.findWorker(id).stopJob();
+ }
+
+ private NutchTool createTool(JobConfig jobConfig, Configuration conf){
+ if(StringUtils.isNotBlank(jobConfig.getJobClassName())){
+ return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf);
+ }
+ return jobFactory.createToolByType(jobConfig.getType(), conf);
+ }
+}
Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.text.MessageFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.util.NutchTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobWorker implements Runnable{
+
+ private JobInfo jobInfo;
+ private JobConfig jobConfig;
+ private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
+ private NutchTool tool;
+
+ /**
+ * To initialize JobWorker thread with the Job Configurations provided by user.
+ * @param jobConfig
+ * @param conf
+ * @param tool - NutchTool to run
+ */
+ public JobWorker(JobConfig jobConfig, Configuration conf, NutchTool tool) {
+ this.jobConfig = jobConfig;
+ this.tool = tool;
+ if (jobConfig.getConfId() == null) {
+ jobConfig.setConfId(ConfigResource.DEFAULT);
+ }
+
+ jobInfo = new JobInfo(generateId(), jobConfig, State.IDLE, "idle");
+ if (jobConfig.getCrawlId() != null) {
+ conf.set(Nutch.CRAWL_ID_KEY, jobConfig.getCrawlId());
+ }
+ }
+
+ private String generateId() {
+ if (jobConfig.getCrawlId() == null) {
+ return MessageFormat.format("{0}-{1}-{2}", jobConfig.getConfId(),
+ jobConfig.getType(), String.valueOf(hashCode()));
+ }
+ return MessageFormat.format("{0}-{1}-{2}-{3}", jobConfig.getCrawlId(),
+ jobConfig.getConfId(), jobConfig.getType(), String.valueOf(hashCode()));
+ }
+
+ @Override
+ public void run() {
+ try {
+ getInfo().setState(State.RUNNING);
+ getInfo().setMsg("OK");
+ getInfo().setResult(tool.run(getInfo().getArgs(), getInfo().getCrawlId()));
+ getInfo().setState(State.FINISHED);
+ } catch (Exception e) {
+ LOG.error("Cannot run job worker!", e);
+ getInfo().setMsg("ERROR: " + e.toString());
+ getInfo().setState(State.FAILED);
+ }
+ }
+
+ public JobInfo getInfo() {
+ return jobInfo;
+ }
+
+ /**
+ * To stop the executing job
+ * @return boolean true/false
+ */
+ public boolean stopJob() {
+ getInfo().setState(State.STOPPING);
+ try {
+ return tool.stopJob();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot stop job with id " + getInfo().getId(), e);
+ }
+ }
+
+ public boolean killJob() {
+ getInfo().setState(State.KILLING);
+ try {
+ boolean result = tool.killJob();
+ getInfo().setState(State.KILLED);
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Cannot kill job with id " + getInfo().getId(), e);
+ }
+ }
+
+ public void setInfo(JobInfo jobInfo) {
+ this.jobInfo = jobInfo;
+ }
+
+}
Added: nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nutch.service.model.response.JobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+
+
+
+public class NutchServerPoolExecutor extends ThreadPoolExecutor{
+
+ private Queue<JobWorker> workersHistory;
+ private Queue<JobWorker> runningWorkers;
+
+ public NutchServerPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){
+ super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
+ workersHistory = Queues.newArrayBlockingQueue(maxPoolSize);
+ runningWorkers = Queues.newArrayBlockingQueue(maxPoolSize);
+ }
+
+ @Override
+ protected void beforeExecute(Thread thread, Runnable runnable) {
+ super.beforeExecute(thread, runnable);
+ synchronized (runningWorkers) {
+ runningWorkers.offer(((JobWorker) runnable));
+ }
+ }
+ @Override
+ protected void afterExecute(Runnable runnable, Throwable throwable) {
+ super.afterExecute(runnable, throwable);
+ synchronized (runningWorkers) {
+ runningWorkers.remove(((JobWorker) runnable).getInfo());
+ }
+ JobWorker worker = ((JobWorker) runnable);
+ addStatusToHistory(worker);
+ }
+
+ private void addStatusToHistory(JobWorker worker) {
+ synchronized (workersHistory) {
+ if (!workersHistory.offer(worker)) {
+ workersHistory.poll();
+ workersHistory.add(worker);
+ }
+ }
+ }
+
+ /**
+ * Find the Job Worker Thread
+ * @param jobId
+ * @return
+ */
+ public JobWorker findWorker(String jobId) {
+ synchronized (runningWorkers) {
+ for (JobWorker worker : runningWorkers) {
+ if (StringUtils.equals(worker.getInfo().getId(), jobId)) {
+ return worker;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gives the Job history
+ * @return
+ */
+ public Collection<JobInfo> getJobHistory() {
+ return getJobsInfo(workersHistory);
+ }
+
+ /**
+ * Gives the list of currently running jobs
+ * @return
+ */
+ public Collection<JobInfo> getJobRunning() {
+ return getJobsInfo(runningWorkers);
+ }
+
+ /**
+ * Gives all jobs(currently running and completed)
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public Collection<JobInfo> getAllJobs() {
+ return CollectionUtils.union(getJobRunning(), getJobHistory());
+ }
+
+ private Collection<JobInfo> getJobsInfo(Collection<JobWorker> workers) {
+ List<JobInfo> jobsInfo = Lists.newLinkedList();
+ for (JobWorker worker : workers) {
+ jobsInfo.add(worker.getInfo());
+ }
+ return jobsInfo;
+ }
+
+
+ public JobInfo getInfo(String jobId) {
+ for (JobInfo jobInfo : getAllJobs()) {
+ if (StringUtils.equals(jobId, jobInfo.getId())) {
+ return jobInfo;
+ }
+ }
+ return null;
+ }
+
+}
Added: nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.model.request;
+
+import java.util.Map;
+
+public class DbQuery {
+
+ private String confId;
+ private String type;
+ private Map<String, String> args;
+ private String crawlId;
+
+ public String getConfId() {
+ return confId;
+ }
+ public void setConfId(String confId) {
+ this.confId = confId;
+ }
+ public Map<String, String> getArgs() {
+ return args;
+ }
+ public void setArgs(Map<String, String> args) {
+ this.args = args;
+ }
+ public String getType() {
+ return type;
+ }
+ public void setType(String type) {
+ this.type = type;
+ }
+ public String getCrawlId() {
+ return crawlId;
+ }
+ public void setCrawlId(String crawlId) {
+ this.crawlId = crawlId;
+ }
+
+
+
+}
Added: nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.resources;
+
+
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.crawl.CrawlDbReader;
+import org.apache.nutch.service.model.request.DbQuery;
+
+@Path(value = "/db")
+public class DbResource extends AbstractResource {
+
+ @POST
+ @Path(value = "/crawldb")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Object readdb(DbQuery dbQuery){
+ Configuration conf = configManager.get(dbQuery.getConfId());
+ String type = dbQuery.getType();
+
+ if(type.equalsIgnoreCase("stats")){
+ return crawlDbStats(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+ }
+ if(type.equalsIgnoreCase("dump")){
+ return crawlDbDump(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+ }
+ if(type.equalsIgnoreCase("topN")){
+ return crawlDbTopN(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+ }
+ if(type.equalsIgnoreCase("url")){
+ return crawlDbUrl(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+ }
+ return null;
+
+ }
+
+ @SuppressWarnings("resource")
+ private Response crawlDbStats(Configuration conf, Map<String, String> args, String crawlId){
+ CrawlDbReader dbr = new CrawlDbReader();
+ try{
+ return Response.ok(dbr.query(args, conf, "stats", crawlId)).build();
+ }catch(Exception e){
+ e.printStackTrace();
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ private Response crawlDbDump(Configuration conf, Map<String, String> args, String crawlId){
+ CrawlDbReader dbr = new CrawlDbReader();
+ try{
+ return Response.ok(dbr.query(args, conf, "dump", crawlId), MediaType.APPLICATION_OCTET_STREAM).build();
+ }catch(Exception e){
+ e.printStackTrace();
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+
+ @Produces(MediaType.APPLICATION_OCTET_STREAM)
+ private Response crawlDbTopN(Configuration conf, Map<String, String> args, String crawlId) {
+ CrawlDbReader dbr = new CrawlDbReader();
+ try{
+ return Response.ok(dbr.query(args, conf, "topN", crawlId), MediaType.APPLICATION_OCTET_STREAM).build();
+ }catch(Exception e){
+ e.printStackTrace();
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+
+ private Response crawlDbUrl(Configuration conf, Map<String, String> args, String crawlId){
+ CrawlDbReader dbr = new CrawlDbReader();
+ try{
+ return Response.ok(dbr.query(args, conf, "url", crawlId)).build();
+ }catch(Exception e){
+ e.printStackTrace();
+ return Response.serverError().entity(e.getMessage()).build();
+ }
+ }
+}