You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2015/04/22 05:50:53 UTC

svn commit: r1675251 - in /nutch/trunk/src/java/org/apache/nutch/service: impl/JobFactory.java impl/JobManagerImpl.java impl/JobWorker.java impl/NutchServerPoolExecutor.java model/request/DbQuery.java resources/DbResource.java

Author: mattmann
Date: Wed Apr 22 03:50:52 2015
New Revision: 1675251

URL: http://svn.apache.org/r1675251
Log:
More remainder of NUTCH-1973: Job Administration end point for the REST service contributed by Sujen Shah <su...@gmail.com>

Added:
    nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java
    nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java
    nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java
    nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java
    nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java
    nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java

Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobFactory.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.service.JobManager.JobType;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.DeduplicationJob;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchTool;
+
+import com.google.common.collect.Maps;
+
+public class JobFactory {
+  private static Map<JobType, Class<? extends NutchTool>> typeToClass;
+
+  static {
+    typeToClass = Maps.newHashMap();
+    typeToClass.put(JobType.INJECT, Injector.class);
+    typeToClass.put(JobType.GENERATE, Generator.class);
+    typeToClass.put(JobType.FETCH, Fetcher.class);
+    typeToClass.put(JobType.PARSE, ParseSegment.class);
+    typeToClass.put(JobType.UPDATEDB, CrawlDb.class);
+    typeToClass.put(JobType.INVERTLINKS, LinkDb.class);
+    typeToClass.put(JobType.DEDUP, DeduplicationJob.class);
+  }
+
+  public NutchTool createToolByType(JobType type, Configuration conf) {
+    if (!typeToClass.containsKey(type)) {
+      return null;
+    }
+    Class<? extends NutchTool> clz = typeToClass.get(type);
+    return createTool(clz, conf);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public NutchTool createToolByClassName(String className, Configuration conf) {
+    try {
+      Class clz = Class.forName(className);
+      return createTool(clz, conf);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private NutchTool createTool(Class<? extends NutchTool> clz,
+      Configuration conf) {
+    return ReflectionUtils.newInstance(clz, conf);
+  }
+
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobManagerImpl.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.JobManager;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.util.NutchTool;
+
+public class JobManagerImpl implements JobManager {
+
+  private JobFactory jobFactory;
+  private NutchServerPoolExecutor executor;
+  private ConfManager configManager;
+
+  public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) {
+    this.jobFactory = jobFactory;
+    this.configManager = configManager;		
+    this.executor = executor;
+  }
+
+  @Override
+  public JobInfo create(JobConfig jobConfig) {
+    if (jobConfig.getArgs() == null) {
+      throw new IllegalArgumentException("Arguments cannot be null!");
+    }
+    Configuration conf = cloneConfiguration(jobConfig.getConfId());
+    NutchTool tool = createTool(jobConfig, conf);
+    JobWorker worker = new JobWorker(jobConfig, conf, tool);
+    executor.execute(worker);
+    executor.purge();		
+    return worker.getInfo();
+  }
+
+  private Configuration cloneConfiguration(String confId) {
+    Configuration conf = configManager.get(confId);
+    if (conf == null) {
+      throw new IllegalArgumentException("Unknown confId " + confId);
+    }
+    return new Configuration(conf);
+  }
+
+  @Override
+  public Collection<JobInfo> list(String crawlId, State state) {
+    if (state == null || state == State.ANY) {
+      return executor.getAllJobs();
+    }
+    if (state == State.RUNNING || state == State.IDLE) {
+      return executor.getJobRunning();
+    }
+    return executor.getJobHistory();
+  }
+
+  @Override
+  public JobInfo get(String crawlId, String jobId) {
+    return executor.getInfo(jobId);
+  }
+
+  @Override
+  public boolean abort(String crawlId, String id) {
+    return executor.findWorker(id).killJob();
+  }
+
+  @Override
+  public boolean stop(String crawlId, String id) {
+    return executor.findWorker(id).stopJob();
+  }
+
+  private NutchTool createTool(JobConfig jobConfig, Configuration conf){
+    if(StringUtils.isNotBlank(jobConfig.getJobClassName())){
+      return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf);
+    }
+    return jobFactory.createToolByType(jobConfig.getType(), conf);
+  }
+}

Added: nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/JobWorker.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.text.MessageFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.util.NutchTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobWorker implements Runnable{
+
+  private JobInfo jobInfo;
+  private JobConfig jobConfig;
+  private static final Logger LOG = LoggerFactory.getLogger(JobWorker.class);
+  private NutchTool tool;
+
+  /**
+   * To initialize JobWorker thread with the Job Configurations provided by user.
+   * @param jobConfig
+   * @param conf
+   * @param tool - NutchTool to run 
+   */
+  public JobWorker(JobConfig jobConfig, Configuration conf, NutchTool tool) {
+    this.jobConfig = jobConfig;
+    this.tool = tool;
+    if (jobConfig.getConfId() == null) {
+      jobConfig.setConfId(ConfigResource.DEFAULT);
+    }
+
+    jobInfo = new JobInfo(generateId(), jobConfig, State.IDLE, "idle");
+    if (jobConfig.getCrawlId() != null) {
+      conf.set(Nutch.CRAWL_ID_KEY, jobConfig.getCrawlId());
+    }
+  }
+
+  private String generateId() {
+    if (jobConfig.getCrawlId() == null) {
+      return MessageFormat.format("{0}-{1}-{2}", jobConfig.getConfId(),
+          jobConfig.getType(), String.valueOf(hashCode()));
+    }
+    return MessageFormat.format("{0}-{1}-{2}-{3}", jobConfig.getCrawlId(),
+        jobConfig.getConfId(), jobConfig.getType(), String.valueOf(hashCode()));
+  }
+
+  @Override
+  public void run() {
+    try {
+      getInfo().setState(State.RUNNING);
+      getInfo().setMsg("OK");
+      getInfo().setResult(tool.run(getInfo().getArgs(), getInfo().getCrawlId()));
+      getInfo().setState(State.FINISHED);
+    } catch (Exception e) {
+      LOG.error("Cannot run job worker!", e);
+      getInfo().setMsg("ERROR: " + e.toString());
+      getInfo().setState(State.FAILED);
+    }
+  }
+
+  public JobInfo getInfo() {
+    return jobInfo;
+  }
+
+  /**
+   * To stop the executing job
+   * @return boolean true/false
+   */
+  public boolean stopJob() {
+    getInfo().setState(State.STOPPING);
+    try {
+      return tool.stopJob();
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Cannot stop job with id " + getInfo().getId(), e);
+    }
+  }
+
+  public boolean killJob() {
+    getInfo().setState(State.KILLING);
+    try {
+      boolean result = tool.killJob();
+      getInfo().setState(State.KILLED);
+      return result;
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Cannot kill job with id " + getInfo().getId(), e);
+    }
+  }
+
+  public void setInfo(JobInfo jobInfo) {
+    this.jobInfo = jobInfo;
+  }
+
+}

Added: nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/impl/NutchServerPoolExecutor.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nutch.service.model.response.JobInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+
+
+
+public class NutchServerPoolExecutor extends ThreadPoolExecutor{
+
+  private Queue<JobWorker> workersHistory;
+  private Queue<JobWorker> runningWorkers;
+
+  public NutchServerPoolExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue){
+    super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
+    workersHistory = Queues.newArrayBlockingQueue(maxPoolSize);
+    runningWorkers = Queues.newArrayBlockingQueue(maxPoolSize);
+  }
+
+  @Override
+  protected void beforeExecute(Thread thread, Runnable runnable) {
+    super.beforeExecute(thread, runnable);
+    synchronized (runningWorkers) {
+      runningWorkers.offer(((JobWorker) runnable));
+    }
+  }
+  @Override
+  protected void afterExecute(Runnable runnable, Throwable throwable) {
+    super.afterExecute(runnable, throwable);
+    synchronized (runningWorkers) {
+      runningWorkers.remove(((JobWorker) runnable).getInfo());
+    }
+    JobWorker worker = ((JobWorker) runnable);
+    addStatusToHistory(worker);
+  }
+
+  private void addStatusToHistory(JobWorker worker) {
+    synchronized (workersHistory) {
+      if (!workersHistory.offer(worker)) {
+        workersHistory.poll();
+        workersHistory.add(worker);
+      }
+    }
+  }
+
+  /**
+   * Find the Job Worker Thread
+   * @param jobId
+   * @return
+   */
+  public JobWorker findWorker(String jobId) {
+    synchronized (runningWorkers) {
+      for (JobWorker worker : runningWorkers) {
+        if (StringUtils.equals(worker.getInfo().getId(), jobId)) {
+          return worker;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gives the Job history
+   * @return
+   */
+  public Collection<JobInfo> getJobHistory() {
+    return getJobsInfo(workersHistory);
+  }
+
+  /**
+   * Gives the list of currently running jobs
+   * @return
+   */
+  public Collection<JobInfo> getJobRunning() {
+    return getJobsInfo(runningWorkers);
+  }
+
+  /**
+   * Gives all jobs(currently running and completed)
+   * @return
+   */
+  @SuppressWarnings("unchecked")
+  public Collection<JobInfo> getAllJobs() {
+    return CollectionUtils.union(getJobRunning(), getJobHistory());
+  }
+
+  private Collection<JobInfo> getJobsInfo(Collection<JobWorker> workers) {
+    List<JobInfo> jobsInfo = Lists.newLinkedList();
+    for (JobWorker worker : workers) {
+      jobsInfo.add(worker.getInfo());
+    }
+    return jobsInfo;
+  }
+
+
+  public JobInfo getInfo(String jobId) {
+    for (JobInfo jobInfo : getAllJobs()) {
+      if (StringUtils.equals(jobId, jobInfo.getId())) {
+        return jobInfo;
+      }
+    }
+    return null;
+  }
+
+}

Added: nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/model/request/DbQuery.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.model.request;
+
+import java.util.Map;
+
+public class DbQuery {
+
+  private String confId;
+  private String type;
+  private Map<String, String> args;
+  private String crawlId;
+
+  public String getConfId() {
+    return confId;
+  }
+  public void setConfId(String confId) {
+    this.confId = confId;
+  }
+  public Map<String, String> getArgs() {
+    return args;
+  }
+  public void setArgs(Map<String, String> args) {
+    this.args = args;
+  }
+  public String getType() {
+    return type;
+  }
+  public void setType(String type) {
+    this.type = type;
+  }
+  public String getCrawlId() {
+    return crawlId;
+  }
+  public void setCrawlId(String crawlId) {
+    this.crawlId = crawlId;
+  }
+
+
+
+}

Added: nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java?rev=1675251&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/service/resources/DbResource.java Wed Apr 22 03:50:52 2015
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.resources;
+
+
+import java.util.Map;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.crawl.CrawlDbReader;
+import org.apache.nutch.service.model.request.DbQuery;
+
+@Path(value = "/db")
+public class DbResource extends AbstractResource {
+
+  @POST
+  @Path(value = "/crawldb")
+  @Consumes(MediaType.APPLICATION_JSON)
+  public Object readdb(DbQuery dbQuery){
+    Configuration conf = configManager.get(dbQuery.getConfId());
+    String type = dbQuery.getType();
+
+    if(type.equalsIgnoreCase("stats")){
+      return crawlDbStats(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+    }
+    if(type.equalsIgnoreCase("dump")){
+      return crawlDbDump(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+    }
+    if(type.equalsIgnoreCase("topN")){
+      return crawlDbTopN(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+    }
+    if(type.equalsIgnoreCase("url")){
+      return crawlDbUrl(conf, dbQuery.getArgs(), dbQuery.getCrawlId());
+    }
+    return null;
+
+  }	
+
+  @SuppressWarnings("resource")
+  private Response crawlDbStats(Configuration conf, Map<String, String> args, String crawlId){
+    CrawlDbReader dbr = new CrawlDbReader();
+    try{
+      return Response.ok(dbr.query(args, conf, "stats", crawlId)).build();
+    }catch(Exception e){
+      e.printStackTrace();
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  @Produces(MediaType.APPLICATION_OCTET_STREAM)
+  private Response crawlDbDump(Configuration conf, Map<String, String> args, String crawlId){
+    CrawlDbReader dbr = new CrawlDbReader();
+    try{
+      return Response.ok(dbr.query(args, conf, "dump", crawlId), MediaType.APPLICATION_OCTET_STREAM).build();
+    }catch(Exception e){
+      e.printStackTrace();
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+
+  @Produces(MediaType.APPLICATION_OCTET_STREAM)
+  private Response crawlDbTopN(Configuration conf, Map<String, String> args, String crawlId) {
+    CrawlDbReader dbr = new CrawlDbReader();
+    try{
+      return Response.ok(dbr.query(args, conf, "topN", crawlId), MediaType.APPLICATION_OCTET_STREAM).build();
+    }catch(Exception e){
+      e.printStackTrace();
+      return Response.serverError().entity(e.getMessage()).build();
+    }		
+  }
+
+  private Response crawlDbUrl(Configuration conf, Map<String, String> args, String crawlId){
+    CrawlDbReader dbr = new CrawlDbReader();
+    try{
+      return Response.ok(dbr.query(args, conf, "url", crawlId)).build();
+    }catch(Exception e){
+      e.printStackTrace();
+      return Response.serverError().entity(e.getMessage()).build();
+    }
+  }
+}