You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2010/03/15 04:32:25 UTC

svn commit: r923045 - in /synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors: ./ config/ queues/

Author: supun
Date: Mon Mar 15 03:32:25 2010
New Revision: 923045

URL: http://svn.apache.org/viewvc?rev=923045&view=rev
Log:
fixing few minor bugs and improving the documentation

Modified:
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
    synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java Mon Mar 15 03:32:25 2010
@@ -53,19 +53,21 @@ public class MultiPriorityBlockingQueue<
     private int capacity = Integer.MAX_VALUE;
 
     /** Algorithm for determining next queue */
-    private NextQueueAlgorithm<E> nextQueue;
+    private NextQueueAlgorithm<E> nextQueueAlgorithm;
     
     /** whether fixed size queues are used */
     private boolean isFixedSizeQueues;
 
     /**
-     * Create a queue with the given queues
+     * Create a queue with the given queues. </p>
      *
-     * This method will create a Queue that accepts objects with only the priorities specified.
-     * If a object is submitted with a different priority it will result in a null point
-     * exception.
-     * @param queues internal queues to be used
-     * @param isFixedQueues weather fixed queues are used
+     * <p> This method will create a Queue that accepts objects with only the priorities specified.
+     * If a object is submitted with a different priority it will result in an
+     * IllegalArgumentException. If the algorithm is null, this queue will use the
+     * PRRNextQueueAlgorithm.</p>
+     * 
+     * @param queues list of InternalQueue to be used
+     * @param isFixedQueues weather fixed size queues are used
      * @param algorithm algorithm for calculating next queue
      */
     public MultiPriorityBlockingQueue(List<InternalQueue<E>> queues,
@@ -93,12 +95,12 @@ public class MultiPriorityBlockingQueue<
         }
 
         if (algorithm == null) {
-            nextQueue = new PRRNextQueueAlgorithm<E>();
+            nextQueueAlgorithm = new PRRNextQueueAlgorithm<E>();
         } else {
-            nextQueue = algorithm;
+            nextQueueAlgorithm = algorithm;
         }
         // initialize the algorithm
-        nextQueue.init(queues);
+        nextQueueAlgorithm.init(queues);
     }
 
     /**
@@ -207,11 +209,11 @@ public class MultiPriorityBlockingQueue<
         final ReentrantLock lock = this.lock;
         lock.lockInterruptibly();
         try {
-            InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+            InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
             try {
                 while (internalQueue == null) {
                     notEmpty.await();
-                    internalQueue = nextQueue.getNextQueue();
+                    internalQueue = nextQueueAlgorithm.getNextQueue();
                 }
             } catch (InterruptedException ie) {
                 notEmpty.signal();
@@ -241,7 +243,7 @@ public class MultiPriorityBlockingQueue<
         lock.lockInterruptibly();
         try {
             for (;;) {
-                InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+                InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
                 if (internalQueue != null) {
                     E e = internalQueue.poll();
                     count--;
@@ -319,7 +321,7 @@ public class MultiPriorityBlockingQueue<
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+            InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
             if (internalQueue != null) {
                 count--;
                 E e = internalQueue.poll();
@@ -341,7 +343,7 @@ public class MultiPriorityBlockingQueue<
         final ReentrantLock lock = this.lock;
         lock.lock();
         try {
-            InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+            InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
             if (internalQueue != null) {
                 return internalQueue.peek();
             } else {
@@ -455,7 +457,7 @@ public class MultiPriorityBlockingQueue<
                 return q;
             }
         }
-        return null;
+        throw new IllegalArgumentException();
     }
 
     private class QueueIterator implements Iterator<E> {
@@ -504,8 +506,8 @@ public class MultiPriorityBlockingQueue<
         return queues;
     }
 
-    public NextQueueAlgorithm<E> getNextQueue() {
-        return nextQueue;
+    public NextQueueAlgorithm<E> getNextQueueAlgorithm() {
+        return nextQueueAlgorithm;
     }
 
     public boolean isFixedSizeQueues() {

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Mon Mar 15 03:32:25 2010
@@ -37,22 +37,18 @@ public class PriorityExecutor {
 
     /** Actual thread pool executor */
     private ThreadPoolExecutor executor;
-
     /** Name of the executor */
     private String name = null;
-
     /** Core threads count */
     private int core = ExecutorConstants.DEFAULT_CORE;
     /** Max thread count */
     private int max = ExecutorConstants.DEFAULT_MAX;
     /** Keep alive time for spare threads */
     private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
-
     /** This will be executed before the Task is submitted  */
     private BeforeExecuteHandler beforeExecuteHandler;
     /** Queue used by the executor */
     private MultiPriorityBlockingQueue<Runnable> queue;
-
     /** this is used by the file based synapse xml configuration */
     private String fileName;
 
@@ -95,11 +91,11 @@ public class PriorityExecutor {
     }
 
     /**
-     * Destroy the executor. 
+     * Destroy the executor. Stop all the threads running. 
      */
     public void destroy() {
         if (log.isDebugEnabled()) {
-            log.debug("Shutting down thread pool executor");
+            log.debug("Shutting down priority executor" + (name != null ? ": " + name : ""));
         }
 
         executor.shutdown();
@@ -152,7 +148,7 @@ public class PriorityExecutor {
     }
 
     /**
-     * Set the queue
+     * Set the queue.
      *
      * @param queue queue used for handling the priorities
      */
@@ -161,7 +157,7 @@ public class PriorityExecutor {
     }
 
     /**
-     * Get the queue
+     * Get the queue.
      *
      * @return queue used for handling multiple priorities
      */

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java Mon Mar 15 03:32:25 2010
@@ -59,7 +59,6 @@ public class PriorityExecutorFactory {
                                                   boolean requireName) throws AxisFault {        
         QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
         QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
-
         QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
 
         PriorityExecutor executor = new PriorityExecutor();

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java Mon Mar 15 03:32:25 2010
@@ -32,6 +32,7 @@ public class PriorityExecutorSerializer 
 
     public static OMElement serialize(OMElement parent,
                                       PriorityExecutor executor, String namespace) {        
+
         QName executorQName = createQname(namespace, ExecutorConstants.PRIORITY_EXECUTOR);
         QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
         QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
@@ -55,7 +56,7 @@ public class PriorityExecutorSerializer 
 
         // create the queues configuration
         MultiPriorityBlockingQueue queue = executor.getQueue();
-        NextQueueAlgorithm algo = queue.getNextQueue();
+        NextQueueAlgorithm algo = queue.getNextQueueAlgorithm();
         OMElement queuesEle = fac.createOMElement(queuesQName);
 
         if (!(algo instanceof PRRNextQueueAlgorithm)) {

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Mon Mar 15 03:32:25 2010
@@ -22,9 +22,7 @@ package org.apache.synapse.commons.execu
 import org.apache.synapse.commons.executors.InternalQueue;
 
 import java.util.concurrent.locks.Condition;
-import java.util.AbstractQueue;
-import java.util.Iterator;
-import java.util.Collection;
+import java.util.*;
 
 /**
  * A bounded queue implementation for internal queues. This queue is backed by an
@@ -62,14 +60,18 @@ public class FixedSizeQueue<E> extends A
     /**
      * Head of the queue
      */
-    private int head;
+    private int head = 0;
 
     /**
      * Tail of the queue
      */
-    private int tail;
-    
+    private int tail = 0;
 
+    /**
+     * Create a queue with the given priority and capacity.
+     * @param priority priority of the elements in the queue
+     * @param capacity capacity of the queue
+     */
     public FixedSizeQueue(int priority, int capacity) {
         this.priority = priority;        
         this.capacity = capacity;
@@ -93,7 +95,9 @@ public class FixedSizeQueue<E> extends A
         this.notFullCond = notFullCond;
     }
 
-    public Iterator<E> iterator() {return null;}
+    public Iterator<E> iterator() {
+        return new Itr<E>();
+    }
 
     public int size() {
         return count;
@@ -176,8 +180,64 @@ public class FixedSizeQueue<E> extends A
         return false;
     }
 
-    private int increment(int i) {
-        return (++i == array.length)? 0 : i;
+    @Override
+    public boolean remove(Object o) {
+        boolean found = false;
+        int i = head;
+        int iterations = 0;
+        while (iterations++ < count) {
+            if (!found && array[i].equals(o)) {
+                found = true;
+            }
+
+            if (found) {
+                int j = increment(i);
+                array[i] = array[j];
+            }
+
+            i = increment(i);
+        }
+
+        if (found) {
+            count--;
+            tail = decrement(tail);
+        }
+
+        return found;
+    }
+
+    private class Itr<E> implements Iterator<E> {
+	    int index = head;
+
+        public boolean hasNext() {
+            return index != tail;
+        }
+
+        public E next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+
+            return (E) array[index++];
+
+        }
+
+        public void remove() {            
+            while (index != tail) {
+                int j = increment(index);
+
+                array[index] = array[j];
+                index = j;
+            }
+        }
+    }
+
+    private int decrement(int n) {
+        return (n == 0) ? array.length - 1 : n;
+    }
+
+    private int increment(int n) {
+        return (++n == array.length) ? 0 : n;
     }
 
     private void insert(E e) {

Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Mon Mar 15 03:32:25 2010
@@ -116,4 +116,9 @@ public class UnboundedQueue<E> extends A
     public boolean contains(Object o) {
         return elements.contains(o);
     }
+
+    @Override
+    public boolean remove(Object o) {
+        return elements.remove(o);
+    }
 }