You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2010/03/13 07:22:15 UTC

svn commit: r922486 - in /synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors: ./ queues/

Author: supun
Date: Sat Mar 13 06:22:14 2010
New Revision: 922486

URL: http://svn.apache.org/viewvc?rev=922486&view=rev
Log:
improving documentation and fixing a minor bug

Modified:
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java Sat Mar 13 06:22:14 2010
@@ -408,11 +408,19 @@ public class MultiPriorityBlockingQueue<
     }
 
     public void clear() {
-        while (true) {
-            if (poll() == null)  break;
-        }
+        final ReentrantLock lock = this.lock;
+        lock.lock();
+        try {
+            for (InternalQueue<E> intQueue : queues) {
+                intQueue.clear();
+            }
+            count = 0;
+        } finally {
+            lock.unlock();
+        }                
     }
 
+    @SuppressWarnings({"SuspiciousToArrayCall"})
     public <T> T[] toArray(T[] a) {
         final ReentrantLock lock = this.lock;
         lock.lock();

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java Sat Mar 13 06:22:14 2010
@@ -22,8 +22,8 @@ package org.apache.synapse.commons.execu
 import java.util.List;
 
 /**
- * This interface implements the algorith for determining the next internal
- * queue for picking up the message. This class is created onece and initialized.
+ * This interface abstracts the algorith for determining the next internal
+ * queue for picking up the message. This class is created once and initialized.
  * This class should capture any runtime information about the queues since the
  * MultiPriorityBlockingQueue doesn't hold any runtime state information about
  * the queues.

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java Sat Mar 13 06:22:14 2010
@@ -26,10 +26,10 @@ import java.util.List;
  *
  * <p>This algorithm works in cycles. Lets say we have queues with following priorities.
  * 7, 5, 2 and assume we name the queues as 1, 2, 3 in the order. </p>
- * <p>Here is how messages are picked in a single cycle
- * 1, 1, 1, 1, 1, 1, 1 // all the messages for the queue with priority 1 are sent for this cycle
- * 2, 2, 2, 2, 2,  // all the messages for the queue with priority 2 are sent for this cycle
- * 3, 3 // all the messages with priority 2 are sent for this cycle</p>
+ * <p>Here is how messages are picked in a single cycle </p>
+ * <p> 1, 1, 1, 1, 1, 1, 1 all the messages for the queue with priority 1 are sent for this cycle
+ * 2, 2, 2, 2, 2,  all the messages for the queue with priority 2 are sent for this cycle
+ * 3, 3 all the messages with priority 2 are sent for this cycle</p>
  *
  * <p>This algorithm choose the queues in the above order if all the queues have messages at the
  * point of selection. If a queue doesn't have messages it will skip the queue and move to the
@@ -37,25 +37,16 @@ import java.util.List;
  */
 public class PRRNextQueueAlgorithm<E> implements NextQueueAlgorithm<E> {
     
-    /**
-     * We hold a reference to the actual queue
-     */
+    /** Reference to the actual queue */
     private List<InternalQueue<E>> queues;
 
-    /**
-     * Number of queues, this is just to avoid the overhead of calculating
-     * this again and again
-     */
+    /** Number of queues, we keep this to avoid the overhead of calculatin this again and again */
     private int size = 0;
 
-    /**
-     * Current queue we are operating on
-     */
+    /** Current queue we are operating on */
     private int currentQueue = 0;
 
-    /**
-     * Number of messages sent from the current queue
-     */
+    /** Number of messages sent from the current queue */
     private int currentCount = 0;
 
     public InternalQueue<E> getNextQueue() {
@@ -80,6 +71,7 @@ public class PRRNextQueueAlgorithm<E> im
                 // we move forward until we find a non empty queue or everything is empty
             } while (internalQueue.size() == 0 && c < size);
 
+            // if we come to the initial queue, that means all the queues are empty.
             if (internalQueue.size() == 0) {
                 currentQueue = 0;
                 return null;
@@ -88,9 +80,6 @@ public class PRRNextQueueAlgorithm<E> im
 
         currentCount++;
 
-        /*log.info("Get the queue with the priority: " +
-                        internalQueue.getPriority());*/
-
         return internalQueue;
     }
 

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Sat Mar 13 06:22:14 2010
@@ -47,6 +47,7 @@ public class PriorityExecutor {
     private int max = ExecutorConstants.DEFAULT_MAX;
     /** Keep alive time for spare threads */
     private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
+
     /** This will be executed before the Task is submitted  */
     private BeforeExecuteHandler beforeExecuteHandler;
     /** Queue used by the executor */
@@ -56,12 +57,14 @@ public class PriorityExecutor {
     private String fileName;
 
     /**
-     * Execute a given task with the priority specified.
+     * Execute a given task with the priority specified. If the task throws an exception,
+     * it will be captured and logged to prevent the threads from dying. 
      *
      * @param task task to be executed
      * @param priority priority of the tast
      */
     public void execute(final Runnable task, int priority) {
+        // create a dummy worker to execute the task
         Worker w = new Worker(task, priority);
 
         if (beforeExecuteHandler != null) {

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Sat Mar 13 06:22:14 2010
@@ -168,6 +168,14 @@ public class FixedSizeQueue<E> extends A
         return capacity;
     }
 
+    @Override
+    public boolean contains(Object o) {
+        for (E e : array) {
+            if (e.equals(o)) return true;
+        }
+        return false;
+    }
+
     private int increment(int i) {
         return (++i == array.length)? 0 : i;
     }

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Sat Mar 13 06:22:14 2010
@@ -59,11 +59,17 @@ public class UnboundedQueue<E> extends A
     }
 
     public E poll() {
-        return elements.remove(elements.size() - 1);
+        if (elements.size() > 0) {
+            return elements.remove(elements.size() - 1);
+        }
+        return null;
     }
 
     public E peek() {
-        return elements.get(elements.size() - 1);
+        if (elements.size() > 0) {
+            return elements.get(elements.size() - 1);
+        }
+        return null;
     }
 
     public int getPriority() {
@@ -105,4 +111,9 @@ public class UnboundedQueue<E> extends A
     public int getCapacity() {
         return Integer.MAX_VALUE;
     }
+
+    @Override
+    public boolean contains(Object o) {
+        return elements.contains(o);
+    }
 }