You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2010/03/15 04:32:25 UTC
svn commit: r923045 - in
/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors:
./ config/ queues/
Author: supun
Date: Mon Mar 15 03:32:25 2010
New Revision: 923045
URL: http://svn.apache.org/viewvc?rev=923045&view=rev
Log:
fixing few minor bugs and improving the documentation
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java Mon Mar 15 03:32:25 2010
@@ -53,19 +53,21 @@ public class MultiPriorityBlockingQueue<
private int capacity = Integer.MAX_VALUE;
/** Algorithm for determining next queue */
- private NextQueueAlgorithm<E> nextQueue;
+ private NextQueueAlgorithm<E> nextQueueAlgorithm;
/** whether fixed size queues are used */
private boolean isFixedSizeQueues;
/**
- * Create a queue with the given queues
+ * Create a queue with the given queues. </p>
*
- * This method will create a Queue that accepts objects with only the priorities specified.
- * If a object is submitted with a different priority it will result in a null point
- * exception.
- * @param queues internal queues to be used
- * @param isFixedQueues weather fixed queues are used
+ * <p> This method will create a Queue that accepts objects with only the priorities specified.
+ * If a object is submitted with a different priority it will result in an
+ * IllegalArgumentException. If the algorithm is null, this queue will use the
+ * PRRNextQueueAlgorithm.</p>
+ *
+ * @param queues list of InternalQueue to be used
+ * @param isFixedQueues weather fixed size queues are used
* @param algorithm algorithm for calculating next queue
*/
public MultiPriorityBlockingQueue(List<InternalQueue<E>> queues,
@@ -93,12 +95,12 @@ public class MultiPriorityBlockingQueue<
}
if (algorithm == null) {
- nextQueue = new PRRNextQueueAlgorithm<E>();
+ nextQueueAlgorithm = new PRRNextQueueAlgorithm<E>();
} else {
- nextQueue = algorithm;
+ nextQueueAlgorithm = algorithm;
}
// initialize the algorithm
- nextQueue.init(queues);
+ nextQueueAlgorithm.init(queues);
}
/**
@@ -207,11 +209,11 @@ public class MultiPriorityBlockingQueue<
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
- InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+ InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
try {
while (internalQueue == null) {
notEmpty.await();
- internalQueue = nextQueue.getNextQueue();
+ internalQueue = nextQueueAlgorithm.getNextQueue();
}
} catch (InterruptedException ie) {
notEmpty.signal();
@@ -241,7 +243,7 @@ public class MultiPriorityBlockingQueue<
lock.lockInterruptibly();
try {
for (;;) {
- InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+ InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
if (internalQueue != null) {
E e = internalQueue.poll();
count--;
@@ -319,7 +321,7 @@ public class MultiPriorityBlockingQueue<
final ReentrantLock lock = this.lock;
lock.lock();
try {
- InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+ InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
if (internalQueue != null) {
count--;
E e = internalQueue.poll();
@@ -341,7 +343,7 @@ public class MultiPriorityBlockingQueue<
final ReentrantLock lock = this.lock;
lock.lock();
try {
- InternalQueue<E> internalQueue = nextQueue.getNextQueue();
+ InternalQueue<E> internalQueue = nextQueueAlgorithm.getNextQueue();
if (internalQueue != null) {
return internalQueue.peek();
} else {
@@ -455,7 +457,7 @@ public class MultiPriorityBlockingQueue<
return q;
}
}
- return null;
+ throw new IllegalArgumentException();
}
private class QueueIterator implements Iterator<E> {
@@ -504,8 +506,8 @@ public class MultiPriorityBlockingQueue<
return queues;
}
- public NextQueueAlgorithm<E> getNextQueue() {
- return nextQueue;
+ public NextQueueAlgorithm<E> getNextQueueAlgorithm() {
+ return nextQueueAlgorithm;
}
public boolean isFixedSizeQueues() {
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Mon Mar 15 03:32:25 2010
@@ -37,22 +37,18 @@ public class PriorityExecutor {
/** Actual thread pool executor */
private ThreadPoolExecutor executor;
-
/** Name of the executor */
private String name = null;
-
/** Core threads count */
private int core = ExecutorConstants.DEFAULT_CORE;
/** Max thread count */
private int max = ExecutorConstants.DEFAULT_MAX;
/** Keep alive time for spare threads */
private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
-
/** This will be executed before the Task is submitted */
private BeforeExecuteHandler beforeExecuteHandler;
/** Queue used by the executor */
private MultiPriorityBlockingQueue<Runnable> queue;
-
/** this is used by the file based synapse xml configuration */
private String fileName;
@@ -95,11 +91,11 @@ public class PriorityExecutor {
}
/**
- * Destroy the executor.
+ * Destroy the executor. Stop all the threads running.
*/
public void destroy() {
if (log.isDebugEnabled()) {
- log.debug("Shutting down thread pool executor");
+ log.debug("Shutting down priority executor" + (name != null ? ": " + name : ""));
}
executor.shutdown();
@@ -152,7 +148,7 @@ public class PriorityExecutor {
}
/**
- * Set the queue
+ * Set the queue.
*
* @param queue queue used for handling the priorities
*/
@@ -161,7 +157,7 @@ public class PriorityExecutor {
}
/**
- * Get the queue
+ * Get the queue.
*
* @return queue used for handling multiple priorities
*/
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorFactory.java Mon Mar 15 03:32:25 2010
@@ -59,7 +59,6 @@ public class PriorityExecutorFactory {
boolean requireName) throws AxisFault {
QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
-
QName threadsQName = createQname(namespace, ExecutorConstants.THREADS);
PriorityExecutor executor = new PriorityExecutor();
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/config/PriorityExecutorSerializer.java Mon Mar 15 03:32:25 2010
@@ -32,6 +32,7 @@ public class PriorityExecutorSerializer
public static OMElement serialize(OMElement parent,
PriorityExecutor executor, String namespace) {
+
QName executorQName = createQname(namespace, ExecutorConstants.PRIORITY_EXECUTOR);
QName queuesQName = createQname(namespace, ExecutorConstants.QUEUES);
QName queueQName = createQname(namespace, ExecutorConstants.QUEUE);
@@ -55,7 +56,7 @@ public class PriorityExecutorSerializer
// create the queues configuration
MultiPriorityBlockingQueue queue = executor.getQueue();
- NextQueueAlgorithm algo = queue.getNextQueue();
+ NextQueueAlgorithm algo = queue.getNextQueueAlgorithm();
OMElement queuesEle = fac.createOMElement(queuesQName);
if (!(algo instanceof PRRNextQueueAlgorithm)) {
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Mon Mar 15 03:32:25 2010
@@ -22,9 +22,7 @@ package org.apache.synapse.commons.execu
import org.apache.synapse.commons.executors.InternalQueue;
import java.util.concurrent.locks.Condition;
-import java.util.AbstractQueue;
-import java.util.Iterator;
-import java.util.Collection;
+import java.util.*;
/**
* A bounded queue implementation for internal queues. This queue is backed by an
@@ -62,14 +60,18 @@ public class FixedSizeQueue<E> extends A
/**
* Head of the queue
*/
- private int head;
+ private int head = 0;
/**
* Tail of the queue
*/
- private int tail;
-
+ private int tail = 0;
+ /**
+ * Create a queue with the given priority and capacity.
+ * @param priority priority of the elements in the queue
+ * @param capacity capacity of the queue
+ */
public FixedSizeQueue(int priority, int capacity) {
this.priority = priority;
this.capacity = capacity;
@@ -93,7 +95,9 @@ public class FixedSizeQueue<E> extends A
this.notFullCond = notFullCond;
}
- public Iterator<E> iterator() {return null;}
+ public Iterator<E> iterator() {
+ return new Itr<E>();
+ }
public int size() {
return count;
@@ -176,8 +180,64 @@ public class FixedSizeQueue<E> extends A
return false;
}
- private int increment(int i) {
- return (++i == array.length)? 0 : i;
+ @Override
+ public boolean remove(Object o) {
+ boolean found = false;
+ int i = head;
+ int iterations = 0;
+ while (iterations++ < count) {
+ if (!found && array[i].equals(o)) {
+ found = true;
+ }
+
+ if (found) {
+ int j = increment(i);
+ array[i] = array[j];
+ }
+
+ i = increment(i);
+ }
+
+ if (found) {
+ count--;
+ tail = decrement(tail);
+ }
+
+ return found;
+ }
+
+ private class Itr<E> implements Iterator<E> {
+ int index = head;
+
+ public boolean hasNext() {
+ return index != tail;
+ }
+
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return (E) array[index++];
+
+ }
+
+ public void remove() {
+ while (index != tail) {
+ int j = increment(index);
+
+ array[index] = array[j];
+ index = j;
+ }
+ }
+ }
+
+ private int decrement(int n) {
+ return (n == 0) ? array.length - 1 : n;
+ }
+
+ private int increment(int n) {
+ return (++n == array.length) ? 0 : n;
}
private void insert(E e) {
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=923045&r1=923044&r2=923045&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Mon Mar 15 03:32:25 2010
@@ -116,4 +116,9 @@ public class UnboundedQueue<E> extends A
public boolean contains(Object o) {
return elements.contains(o);
}
+
+ @Override
+ public boolean remove(Object o) {
+ return elements.remove(o);
+ }
}