You are viewing a plain text version of this content. The canonical link for it is here.
Posted to droids-commits@incubator.apache.org by to...@apache.org on 2013/01/29 09:50:17 UTC

svn commit: r1439804 - in /incubator/droids/branches/0.2.x-cleanup/droids-core: ./ src/main/java/org/apache/droids/core/ src/main/java/org/apache/droids/handle/ src/main/java/org/apache/droids/parse/ src/main/java/org/apache/droids/taskmaster/ src/test...

Author: tobr
Date: Tue Jan 29 09:50:17 2013
New Revision: 1439804

URL: http://svn.apache.org/viewvc?rev=1439804&view=rev
Log:
defined crawler defaults
added logging 
added link management
updated the API

Added:
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java   (with props)
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java   (with props)
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java   (with props)
Removed:
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/MultiThreadedTaskMaster.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/SequentialTaskMaster.java
Modified:
    incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
    incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/pom.xml Tue Jan 29 09:50:17 2013
@@ -35,7 +35,7 @@
         <relativePath>../pom.xml</relativePath>
     </parent>
     <artifactId>droids-core</artifactId>
-    <name>Apache Droids Core</name>
+    <name>APACHE DROIDS CORE</name>
     <inceptionYear>2007</inceptionYear>
     <description>
         Apache Droids API and core components

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/AbstractDroid.java Tue Jan 29 09:50:17 2013
@@ -21,6 +21,7 @@ import org.apache.droids.helper.factorie
 import org.apache.droids.helper.factories.ParserFactory;
 import org.apache.droids.filter.Filter;
 import org.apache.droids.parse.Parser;
+import org.apache.droids.taskmaster.MultiThreadedTaskMaster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class AbstractDroid<T extends Task> implements Droid<T> {
     protected final Queue<T> queue;
     protected final TaskMaster<T> taskMaster;
-    protected Fetcher fetcher;
+    protected Fetcher<T> fetcher;
     protected ParserFactory parserFactory;
     protected FilterFactory filterFactory;
     protected HandlerFactory handlerFactory;
@@ -42,12 +43,12 @@ public abstract class AbstractDroid<T ex
     protected final static Logger logger = LoggerFactory.getLogger(AbstractDroid.class);
 
     public AbstractDroid() {
-        this(new SimpleTaskQueueWithHistory<T>(), new MultiThreadedTaskMaster<T>());
+        this(null, null);
     }
 
     public AbstractDroid(Queue<T> queue, TaskMaster<T> taskMaster) {
-        this.queue = queue;
-        this.taskMaster = taskMaster;
+        this.queue = queue == null ? new SimpleTaskQueueWithHistory<T>() : queue;
+        this.taskMaster = queue == null ? new MultiThreadedTaskMaster<T>() : taskMaster;
         this.parserFactory = new ParserFactory();
         this.filterFactory = new FilterFactory();
         this.handlerFactory = new HandlerFactory();
@@ -71,11 +72,19 @@ public abstract class AbstractDroid<T ex
 
     @Override
     public void add(T task) {
+        logger.debug("add task: " + task.getURI());
         queue.add(task);
     }
 
     @Override
+    public T filter(T task) {
+        logger.debug("filter task: " + task.getURI());
+        return this.filterFactory.filter(task);
+    }
+
+    @Override
     public void load(T task) throws DroidsException, IOException {
+        logger.debug("load task: " + task.getURI());
         if (this.fetcher == null) {
             throw new DroidsException("Fetcher not set");
         } else {
@@ -85,17 +94,20 @@ public abstract class AbstractDroid<T ex
 
     @Override
     public void parse(T task) throws DroidsException, IOException {
+        logger.debug("parse task: " + task.getURI());
         this.parserFactory.parse(task);
     }
 
     @Override
     public void handle(T task) throws DroidsException, IOException {
+        logger.debug("handle task: " + task.getURI());
         this.handlerFactory.handle(task);
     }
 
     @Override
-    public T filter(T task) {
-        return this.filterFactory.filter(task);
+    public void finish(T task) throws DroidsException, IOException {
+        task.getContentEntity().close();
+        logger.debug("finished task: " + task.getURI());
     }
 
     public void setFetcher(Fetcher fetcher) {

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/ContentEntity.java Tue Jan 29 09:50:17 2013
@@ -1,8 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.droids.core;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  *
@@ -13,7 +31,7 @@ public class ContentEntity {
     private Map<String, Object> data;
 
     public final static String CONTENT = "content";
-    public final static String MIME_TYPE = "mime";
+    public final static String CONTENT_TYPE = "content-type";
     public final static String CONTENT_LENGTH = "content-length";
     public final static String LINKS = "links";
 
@@ -33,15 +51,31 @@ public class ContentEntity {
         this.put(CONTENT, in);
     }
 
+    public <T extends Task> void setLinks(Set<T> links) {
+        this.put(LINKS, links);
+    }
+
     public InputStream getContent() throws DroidsException {
         if (this.getValue(CONTENT) != null) {
             if (this.getValue(CONTENT) instanceof InputStream) {
                 return (InputStream)this.getValue(CONTENT);
             } else {
-                throw new DroidsException("wrong type of content");
+                throw new DroidsException("no content available");
+            }
+        } else {
+            return null;
+        }
+    }
+
+    public <T extends Task> Set<T> getLinks() throws DroidsException {
+        if (this.getValue(LINKS) != null) {
+            if (this.getValue(LINKS) instanceof Set) {
+                return (Set<T>)this.getValue(LINKS);
+            } else {
+                throw new DroidsException("no set of links available");
             }
         } else {
-            throw new NullPointerException();
+            return null;
         }
     }
 
@@ -49,4 +83,10 @@ public class ContentEntity {
         return data;
     }
 
+    public void close() throws DroidsException, IOException {
+        if (this.getContent() != null) {
+            this.getContent().close();
+        }
+    }
+
 }

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Droid.java Tue Jan 29 09:50:17 2013
@@ -78,6 +78,15 @@ public interface Droid<T extends Task> {
     public void handle(T task) throws DroidsException, IOException;
 
     /**
+     * Finish the task.
+     * Close resources like InputStreams, perform monitoring and clean up the task.
+     *
+     * @param task the task to handle
+     */
+    public void finish(T task) throws DroidsException, IOException;
+
+
+    /**
      * Filter the task.
      *
      * @param task the task to filter

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java Tue Jan 29 09:50:17 2013
@@ -59,4 +59,6 @@ public interface Task extends Serializab
     public void abort();
 
     public boolean isAborted();
+
+    public Task createTask(URI uri);
 }

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/handle/SysoutHandler.java Tue Jan 29 09:50:17 2013
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
+import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -36,7 +37,7 @@ public class SysoutHandler extends Write
     private static final Logger logger = LoggerFactory.getLogger(SysoutHandler.class);
 
     public SysoutHandler() {
-        super();
+        this(new HashSet<String>());
     }
 
     public SysoutHandler(Set<String> attributes) {
@@ -53,7 +54,7 @@ public class SysoutHandler extends Write
     @Override
     public void handle(Task task) throws IOException, DroidsException {
         for (String key : task.getContentEntity().getData().keySet()) {
-            if (attributes.contains(key)) {
+            if (attributes.contains(key) || attributes.isEmpty()) {
                 if (task.getContentEntity().getValue(key) instanceof InputStream) {
                     InputStream instream = (InputStream) task.getContentEntity().getValue(key);
                     try {

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/Parser.java Tue Jan 29 09:50:17 2013
@@ -26,12 +26,12 @@ import org.apache.droids.core.Task;
  *
  * @version 1.0
  */
-public interface Parser {
+public interface Parser<T extends Task> {
     /**
      * Creates the parse for some content.
      *
      * @param task the task that correspond to the stream
      * @return the parse object
      */
-    public void parse(Task task) throws DroidsException, IOException;
+    public void parse(T task) throws DroidsException, IOException;
 }

Added: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java?rev=1439804&view=auto
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java (added)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java Tue Jan 29 09:50:17 2013
@@ -0,0 +1,36 @@
+package org.apache.droids.parse;
+
+import org.apache.droids.core.DroidsException;
+import org.apache.droids.core.Task;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashSet;
+import java.util.Scanner;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ *
+ *
+ *
+ */
+public class SimpleLinkParser<T extends Task> implements Parser<T> {
+
+    @Override
+    public void parse(T task) throws DroidsException, IOException {
+        InputStream inStream = task.getContentEntity().getContent();
+        if (inStream != null) {
+            Scanner s = new Scanner(inStream).useDelimiter("\\A");
+            String content = s.hasNext() ? s.next() : "";
+            Pattern linkPattern = Pattern.compile("<a[^>]+href=[\"']?([^\"'>]+)[\"']?[^>]*>(.+?)</a>",  Pattern.CASE_INSENSITIVE|Pattern.DOTALL);
+            Matcher pageMatcher = linkPattern.matcher(content);
+            Set<Task> links = new HashSet<Task>();
+            while(pageMatcher.find()){
+                links.add(task.createTask(task.getURI().resolve(pageMatcher.group(1))));
+            }
+            task.getContentEntity().setLinks(links);
+        }
+    }
+}

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java?rev=1439804&view=auto
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java (added)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java Tue Jan 29 09:50:17 2013
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.apache.droids.monitor.WorkMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for running all the tasks
+ */
+public class MultiThreadedTaskMaster<T extends Task> implements TaskMaster<T> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(MultiThreadedTaskMaster.class);
+    private static final long TICKLE_TIME = 1000L;
+
+    /**
+     * The execution state
+     */
+    protected volatile ExecutionState state = ExecutionState.STOPPED;
+    /**
+     * The delay timer
+     */
+    protected DelayTimer delayTimer;
+    /**
+     * The start time
+     */
+    protected Date startTime;
+    /**
+     * The end time
+     */
+    protected Date endTime;
+    /**
+     * The last completed task
+     */
+    protected T lastCompletedTask;
+    /**
+     * The completed task counter
+     */
+    protected AtomicLong completedTasks = new AtomicLong();
+    /**
+     * The monitor that that records the processing of tasks
+     */
+    protected WorkMonitor<T> monitor;
+    /**
+     * The task exception handler
+     */
+    protected TaskExceptionHandler exceptionHandler;
+
+    /*
+     * The pool size
+     */
+    private int poolSize = 1;
+    /**
+     * The pool
+     */
+    private TaskExecutorPool pool;
+
+    @Override
+    public void start(Queue<T> queue, Droid<T> droid) {
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Start the executor service.");
+        }
+
+        state = ExecutionState.RUNNING;
+
+        if (pool == null) {
+            this.pool = new TaskExecutorPool();
+            this.pool.setCorePoolSize(this.poolSize);
+        }
+
+        // Stagger startup
+        for (int i = 0; i < poolSize; i++) {
+            try {
+                Thread.sleep(TICKLE_TIME);
+            } catch (InterruptedException ignored) {
+                LOG.error("", ignored);
+            }
+            pool.execute(new TaskExecutor(queue, droid));
+        }
+    }
+
+    /**
+     * Stops the TaskMaster
+     */
+    public void stop() {
+        // debug
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Stop the executor service.");
+        }
+
+        state = ExecutionState.STOPPED;
+
+        // Disable new tasks from being submitted
+        pool.shutdown();
+
+        // Wait a while for existing tasks to terminate
+        try {
+            if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+
+                // Cancel currently executing tasks
+                pool.shutdownNow();
+
+                // Wait a while for to respond to being canceled
+                if (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
+                    if (LOG.isInfoEnabled()) {
+                        LOG.info("Scheduler did not stop.");
+                    }
+                }
+            }
+        } catch (InterruptedException ex) {
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Force scheduler to stop.");
+            }
+
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+
+        // debug
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Scheduler stopped.");
+        }
+
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        return pool.awaitTermination(timeout, unit);
+    }
+
+    /**
+     * @inheritDoc
+     */
+    @Override
+    public ExecutionState getExecutionState() {
+        return state;
+    }
+
+    /**
+     * @return
+     * @inheritDoc
+     */
+    public WorkMonitor<T> getMonitor() {
+        return monitor;
+    }
+
+    /**
+     * @param monitor
+     * @inheritDoc
+     */
+    public void setMonitor(WorkMonitor<T> monitor) {
+        if (state == ExecutionState.RUNNING) {
+            throw new IllegalStateException("The TaskMaster must be stopped to set a Monitor.");
+        }
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setExceptionHandler(TaskExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    @Override
+    public void setDelayTimer(DelayTimer delayTimer) {
+        this.delayTimer = delayTimer;
+    }
+
+    @Override
+    public Date getFinishedWorking() {
+        return endTime;
+    }
+
+    @Override
+    public T getLastCompletedTask() {
+        return lastCompletedTask;
+    }
+
+    @Override
+    public long getCompletedTasks() {
+        return completedTasks.get();
+    }
+
+    @Override
+    public Date getStartTime() {
+        return startTime;
+    }
+
+    /**
+     * Sets the pool size
+     *
+     * @return
+     */
+    public int getPoolSize() {
+        return poolSize;
+    }
+
+    /**
+     * Returns the size of the pool
+     *
+     * @param poolSize
+     */
+    public void setPoolSize(int poolSize) {
+        this.poolSize = poolSize;
+        if (pool != null)
+            pool.setCorePoolSize(this.poolSize);
+    }
+
+    private class TaskExecutorPool extends ThreadPoolExecutor {
+
+        private static final long KEEP_ALIVE = 50000L;
+
+        public TaskExecutorPool() {
+            super(poolSize, poolSize, KEEP_ALIVE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+            this.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+        }
+
+        @Override
+        protected void afterExecute(Runnable r, Throwable thrwbl) {
+            super.afterExecute(r, thrwbl);
+
+
+            // try to reexecute the task runner while
+            // the task queue is not empty and while the pool
+            // is still completing the execution of tasks.
+            @SuppressWarnings("unchecked")
+            TaskExecutor taskExecutor = (TaskExecutor) r;
+
+            while (taskExecutor.getQueue().size() > 0 || getQueue().size() > 0) {
+                if (taskExecutor.getQueue().size() > 0) {
+                    execute(r);
+                    return;
+                }
+                try {
+                    Thread.sleep(TICKLE_TIME);
+                } catch (InterruptedException e) {
+                    LOG.error("", e);
+                }
+            }
+
+            state = ExecutionState.COMPLETED;
+            // If this point is reached, a count of one means this completed thread
+            if (this.getActiveCount() == 1) {
+
+                // finish droid just once
+                taskExecutor.getDroid().finished();
+                shutdown();
+            }
+
+        }
+    }
+
+    private class TaskExecutor implements Runnable {
+
+        private final Droid<T> droid;
+        private final Queue<T> queue;
+        private final Worker<T> worker;
+
+        public TaskExecutor(Queue queue, Droid<T> droid) {
+            this.droid = droid;
+            this.queue = queue;
+            this.worker = droid.getNewWorker();
+        }
+
+        public Droid<T> getDroid() {
+            return droid;
+        }
+
+        public Queue<T> getQueue() {
+            return queue;
+        }
+
+        @SuppressWarnings("unused")
+        public Worker<? extends Task> getWorker() {
+            return worker;
+        }
+
+        @Override
+        public void run() {
+            // poll the last task
+            T task = queue.poll();
+
+            if (task == null) {
+                try {
+                    Thread.sleep(TICKLE_TIME);
+                } catch (InterruptedException e) {
+                    LOG.error("", e);
+                }
+                task = queue.poll();
+            }
+
+            // execute the task
+            if (task != null) {
+                try {
+                    // monitor the execution of the task
+                    if (monitor != null) {
+                        monitor.beforeExecute(task, worker);
+                    }
+
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] execute task [" + task + "].");
+                    }
+
+                    // execute the task
+                    if (!task.isAborted()) {
+                        worker.execute(task);
+                    }
+
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] executed task [" + task + "] with success.");
+                    }
+
+                    // monitor the execution of the task
+                    if (monitor != null) {
+                        monitor.afterExecute(task, worker, null);
+                    }
+
+                    // set the monitored variables
+                    completedTasks.incrementAndGet();
+                    lastCompletedTask = task;
+
+                } catch (Exception ex) {
+                    // debug
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Worker [" + worker + "] executed task [" + task + "] without success.");
+                    }
+
+                    // debug
+                    if (LOG.isErrorEnabled()) {
+                        LOG.error("", ex);
+                    }
+
+                    // monitor the exception
+                    if (monitor != null) {
+                        monitor.afterExecute(task, worker, ex);
+                    }
+
+                    // handler the exception
+                    if (ex != null) {
+                        TaskExceptionResult result = exceptionHandler.handleException(ex);
+
+                        // stop the execution in case of a fatal exception
+                        if (TaskExceptionResult.FATAL.equals(result)) {
+                            state = ExecutionState.STOPPED;
+                            droid.finished();
+                            pool.shutdownNow();
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/MultiThreadedTaskMaster.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java?rev=1439804&view=auto
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java (added)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java Tue Jan 29 09:50:17 2013
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.droids.taskmaster;
+
+import java.util.Date;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.droids.core.Droid;
+import org.apache.droids.core.Task;
+import org.apache.droids.core.TaskMaster;
+import org.apache.droids.core.Worker;
+import org.apache.droids.delay.DelayTimer;
+import org.apache.droids.exception.TaskExceptionHandler;
+import org.apache.droids.exception.TaskExceptionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialTaskMaster<T extends Task> implements TaskMaster<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SequentialTaskMaster.class);
+    private final Object mutex;
+    private volatile boolean completed;
+    private volatile Date startedWorking = null;
+    private volatile Date finishedWorking = null;
+    private volatile int completedTask = 0;
+    private volatile T lastCompletedTask = null;
+    private volatile ExecutionState state = ExecutionState.INITIALIZED;
+    private DelayTimer delayTimer = null;
+    private TaskExceptionHandler exHandler = null;
+
+    public SequentialTaskMaster() {
+        super();
+        this.mutex = new Object();
+    }
+
+    /**
+     * The queue has been initialized
+     */
+    @Override
+    public synchronized void start(final Queue<T> queue, final Droid<T> droid) {
+        this.completed = false;
+        this.startedWorking = new Date();
+        this.finishedWorking = null;
+        this.completedTask = 0;
+        this.state = ExecutionState.RUNNING;
+
+        boolean terminated = false;
+        while (!terminated) {
+            T task = queue.poll();
+            if (task == null) {
+                break;
+            }
+            if (delayTimer != null) {
+                long delay = delayTimer.getDelayMillis();
+                if (delay > 0) {
+                    try {
+                        Thread.sleep(delay);
+                    } catch (InterruptedException e) {
+                    }
+                }
+            }
+            Worker<T> worker = droid.getNewWorker();
+            try {
+                if (!task.isAborted()) {
+                    worker.execute(task);
+                }
+                completedTask++;
+                lastCompletedTask = task;
+            } catch (Exception ex) {
+                TaskExceptionResult result = TaskExceptionResult.WARN;
+                if (exHandler != null) {
+                    result = exHandler.handleException(ex);
+                }
+                switch (result) {
+                    case WARN:
+                        LOG.warn(ex.toString() + " " + task.getURI());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug(ex.toString(), ex);
+                        }
+                        break;
+                    case FATAL:
+                        LOG.error(ex.getMessage(), ex);
+                        terminated = true;
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }
+        finishedWorking = new Date();
+        this.state = ExecutionState.STOPPED;
+        droid.finished();
+        synchronized (mutex) {
+            completed = true;
+            mutex.notifyAll();
+        }
+    }
+
+    @Override
+    public final void setExceptionHandler(TaskExceptionHandler exHandler) {
+        this.exHandler = exHandler;
+    }
+
+    @Override
+    public final void setDelayTimer(DelayTimer delayTimer) {
+        this.delayTimer = delayTimer;
+    }
+
+    public boolean isWorking() {
+        return startedWorking != null && finishedWorking == null;
+    }
+
+    @Override
+    public Date getStartTime() {
+        return startedWorking;
+    }
+
+    @Override
+    public Date getFinishedWorking() {
+        return finishedWorking;
+    }
+
+    @Override
+    public long getCompletedTasks() {
+        return completedTask;
+    }
+
+    @Override
+    public T getLastCompletedTask() {
+        return lastCompletedTask;
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+        if (timeout < 0) {
+            timeout = 0;
+        }
+        synchronized (this.mutex) {
+            long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+            long remaining = timeout;
+            while (!completed) {
+                this.mutex.wait(remaining);
+                if (timeout >= 0) {
+                    remaining = deadline - System.currentTimeMillis();
+                    if (remaining <= 0) {
+                        return false; // Reach if timeout is over and no finish.
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public ExecutionState getExecutionState() {
+        return state;
+    }
+}

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/taskmaster/SequentialTaskMaster.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java
URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java?rev=1439804&r1=1439803&r2=1439804&view=diff
==============================================================================
--- incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java (original)
+++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/test/java/org/apache/droids/core/SimpleTask.java Tue Jan 29 09:50:17 2013
@@ -50,4 +50,9 @@ public class SimpleTask implements Task 
         return this.aborted;
     }
 
+    @Override
+    public Task createTask(URI uri) {
+        return new SimpleTask(uri, this.getDepth());
+    }
+
 }



Re: svn commit: r1439804 - in /incubator/droids/branches/0.2.x-cleanup/droids-core: ./ src/main/java/org/apache/droids/core/ src/main/java/org/apache/droids/handle/ src/main/java/org/apache/droids/parse/ src/main/java/org/apache/droids/taskmaster/ src/test...

Posted by Thorsten Scherler <sc...@gmail.com>.
On 01/30/2013 10:42 AM, Tobias Rübner wrote:
> Hi Thorsten,
>
> actually while implementing the new HTTPClient Crawler I needed a simple
> and generic way for the parser to create new tasks.
> When the parser is used for extracting the links, he does not know anything
> about the kind of the task.
> I have an example in the SimpleLinkParser
> https://svn.apache.org/repos/asf/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java
>
> So I thought this might be a good approach.

I understand your saying about the parser I worked around that by using
the getOutlinks() from the LinkTask to store them and the extraction I
do via a c3 pipeline.

Collection<URI> linksTo = new HashSet<URI>();
        List<String> outLinks = pipeline.getOutLinks();
        for (String location : outLinks) {
            URI uri;
            try {
                uri = new URI(location);
                linksTo.add(uri);
            } catch (URISyntaxException ex) {
                logger.error("Invalid location: " + location, ex);
            }
        }
        ((LinkTask) task).setLinksTo(linksTo);

I will try now your implementation and start moving things like the
linkTask back to droids-crawler again. Let us see where I need to adopt.

>
> It is always good to discuss, so please share your thoughts and we can
> create a great Droids-API!

Yeah, really appreciating your efforts!

salu2

>
> Tobias
>
>
> On Tue, Jan 29, 2013 at 4:31 PM, Thorsten Scherler <sc...@gmail.com>wrote:
>
>> On 01/29/2013 04:23 PM, Thorsten Scherler wrote:
>>> On 01/29/2013 10:50 AM, tobr@apache.org wrote:
>>>> Modified:
>> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
>>>> URL:
>> http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
>> ==============================================================================
>>>> ---
>> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
>> (original)
>>>> +++
>> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
>> Tue Jan 29 09:50:17 2013
>>>> @@ -59,4 +59,6 @@ public interface Task extends Serializab
>>>>      public void abort();
>>>>
>>>>      public boolean isAborted();
>>>> +
>>>> +    public Task createTask(URI uri);
>>>>  }
>>> Why did you added createTask to the interface?
>>>
>>> IMO it is not really generic since seeing your implementation and my
>>> current use case I would rather expected something like
>>>
>>> Link task = new LinkTask(link, uri, link.getDepth() + 1);
>>>
>>> /**
>>>      * Creates a new LinkTask.
>>>      *
>>>      * @param from Link
>>>      * @param uri URI
>>>      * @param depth int
>>>      */
>>>
>>> ...but I as well understand your approach.
>>>
>>> However I am doing the creation of tasks in my main CrawlingDroid but I
>>> am trying to understand why you have done it like that.
>>>
>>> salu2
>>>
>> Actually I just fixed my custom code for the linkTask with
>>
>>     @Override
>>     public Link createTask(URI uri) {
>>         return new LinkTask(this, uri, this.getDepth() + 1);
>>     }
>>
>> salu2
>>
>> --
>> Thorsten Scherler <scherler.at.gmail.com>
>> codeBusters S.L. - web based systems
>> <consulting, training and solutions>
>>
>> http://www.codebusters.es/
>>
>>


-- 
Thorsten Scherler <scherler.at.gmail.com>
codeBusters S.L. - web based systems
<consulting, training and solutions>

http://www.codebusters.es/


Re: svn commit: r1439804 - in /incubator/droids/branches/0.2.x-cleanup/droids-core: ./ src/main/java/org/apache/droids/core/ src/main/java/org/apache/droids/handle/ src/main/java/org/apache/droids/parse/ src/main/java/org/apache/droids/taskmaster/ src/test...

Posted by Tobias Rübner <to...@apache.org>.
Hi Thorsten,

actually while implementing the new HTTPClient Crawler I needed a simple
and generic way for the parser to create new tasks.
When the parser is used for extracting the links, he does not know anything
about the kind of the task.
I have an example in the SimpleLinkParser
https://svn.apache.org/repos/asf/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/parse/SimpleLinkParser.java

So I thought this might be a good approach.

It is always good to discuss, so please share your thoughts and we can
create a great Droids-API!

Tobias


On Tue, Jan 29, 2013 at 4:31 PM, Thorsten Scherler <sc...@gmail.com>wrote:

> On 01/29/2013 04:23 PM, Thorsten Scherler wrote:
> > On 01/29/2013 10:50 AM, tobr@apache.org wrote:
> >> Modified:
> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
> >> URL:
> http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
> >>
> ==============================================================================
> >> ---
> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
> (original)
> >> +++
> incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
> Tue Jan 29 09:50:17 2013
> >> @@ -59,4 +59,6 @@ public interface Task extends Serializab
> >>      public void abort();
> >>
> >>      public boolean isAborted();
> >> +
> >> +    public Task createTask(URI uri);
> >>  }
> > Why did you added createTask to the interface?
> >
> > IMO it is not really generic since seeing your implementation and my
> > current use case I would rather expected something like
> >
> > Link task = new LinkTask(link, uri, link.getDepth() + 1);
> >
> > /**
> >      * Creates a new LinkTask.
> >      *
> >      * @param from Link
> >      * @param uri URI
> >      * @param depth int
> >      */
> >
> > ...but I as well understand your approach.
> >
> > However I am doing the creation of tasks in my main CrawlingDroid but I
> > am trying to understand why you have done it like that.
> >
> > salu2
> >
>
> Actually I just fixed my custom code for the linkTask with
>
>     @Override
>     public Link createTask(URI uri) {
>         return new LinkTask(this, uri, this.getDepth() + 1);
>     }
>
> salu2
>
> --
> Thorsten Scherler <scherler.at.gmail.com>
> codeBusters S.L. - web based systems
> <consulting, training and solutions>
>
> http://www.codebusters.es/
>
>

Re: svn commit: r1439804 - in /incubator/droids/branches/0.2.x-cleanup/droids-core: ./ src/main/java/org/apache/droids/core/ src/main/java/org/apache/droids/handle/ src/main/java/org/apache/droids/parse/ src/main/java/org/apache/droids/taskmaster/ src/test...

Posted by Thorsten Scherler <sc...@gmail.com>.
On 01/29/2013 04:23 PM, Thorsten Scherler wrote:
> On 01/29/2013 10:50 AM, tobr@apache.org wrote:
>> Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
>> URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
>> ==============================================================================
>> --- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java (original)
>> +++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java Tue Jan 29 09:50:17 2013
>> @@ -59,4 +59,6 @@ public interface Task extends Serializab
>>      public void abort();
>>  
>>      public boolean isAborted();
>> +
>> +    public Task createTask(URI uri);
>>  }
> Why did you added createTask to the interface?
>
> IMO it is not really generic since seeing your implementation and my
> current use case I would rather expected something like
>
> Link task = new LinkTask(link, uri, link.getDepth() + 1);
>
> /**
>      * Creates a new LinkTask.
>      *
>      * @param from Link
>      * @param uri URI
>      * @param depth int
>      */
>
> ...but I as well understand your approach.
>
> However I am doing the creation of tasks in my main CrawlingDroid but I
> am trying to understand why you have done it like that.
>
> salu2
>

Actually I just fixed my custom code for the linkTask with

    @Override
    public Link createTask(URI uri) {
        return new LinkTask(this, uri, this.getDepth() + 1);
    }

salu2

-- 
Thorsten Scherler <scherler.at.gmail.com>
codeBusters S.L. - web based systems
<consulting, training and solutions>

http://www.codebusters.es/


Re: svn commit: r1439804 - in /incubator/droids/branches/0.2.x-cleanup/droids-core: ./ src/main/java/org/apache/droids/core/ src/main/java/org/apache/droids/handle/ src/main/java/org/apache/droids/parse/ src/main/java/org/apache/droids/taskmaster/ src/test...

Posted by Thorsten Scherler <sc...@gmail.com>.
On 01/29/2013 10:50 AM, tobr@apache.org wrote:
> Modified: incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java
> URL: http://svn.apache.org/viewvc/incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java?rev=1439804&r1=1439803&r2=1439804&view=diff
> ==============================================================================
> --- incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java (original)
> +++ incubator/droids/branches/0.2.x-cleanup/droids-core/src/main/java/org/apache/droids/core/Task.java Tue Jan 29 09:50:17 2013
> @@ -59,4 +59,6 @@ public interface Task extends Serializab
>      public void abort();
>  
>      public boolean isAborted();
> +
> +    public Task createTask(URI uri);
>  }

Why did you added createTask to the interface?

IMO it is not really generic since seeing your implementation and my
current use case I would rather expected something like

Link task = new LinkTask(link, uri, link.getDepth() + 1);

/**
     * Creates a new LinkTask.
     *
     * @param from Link
     * @param uri URI
     * @param depth int
     */

...but I as well understand your approach.

However I am doing the creation of tasks in my main CrawlingDroid but I
am trying to understand why you have done it like that.

salu2

-- 
Thorsten Scherler <scherler.at.gmail.com>
codeBusters S.L. - web based systems
<consulting, training and solutions>

http://www.codebusters.es/