You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by su...@apache.org on 2010/03/13 07:22:15 UTC
svn commit: r922486 - in
/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors:
./ queues/
Author: supun
Date: Sat Mar 13 06:22:14 2010
New Revision: 922486
URL: http://svn.apache.org/viewvc?rev=922486&view=rev
Log:
improving documentation and fixing a minor bug
Modified:
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/MultiPriorityBlockingQueue.java Sat Mar 13 06:22:14 2010
@@ -408,11 +408,19 @@ public class MultiPriorityBlockingQueue<
}
public void clear() {
- while (true) {
- if (poll() == null) break;
- }
+ final ReentrantLock lock = this.lock;
+ lock.lock();
+ try {
+ for (InternalQueue<E> intQueue : queues) {
+ intQueue.clear();
+ }
+ count = 0;
+ } finally {
+ lock.unlock();
+ }
}
+ @SuppressWarnings({"SuspiciousToArrayCall"})
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/NextQueueAlgorithm.java Sat Mar 13 06:22:14 2010
@@ -22,8 +22,8 @@ package org.apache.synapse.commons.execu
import java.util.List;
/**
- * This interface implements the algorith for determining the next internal
- * queue for picking up the message. This class is created onece and initialized.
+ * This interface abstracts the algorith for determining the next internal
+ * queue for picking up the message. This class is created once and initialized.
* This class should capture any runtime information about the queues since the
* MultiPriorityBlockingQueue doesn't hold any runtime state information about
* the queues.
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PRRNextQueueAlgorithm.java Sat Mar 13 06:22:14 2010
@@ -26,10 +26,10 @@ import java.util.List;
*
* <p>This algorithm works in cycles. Lets say we have queues with following priorities.
* 7, 5, 2 and assume we name the queues as 1, 2, 3 in the order. </p>
- * <p>Here is how messages are picked in a single cycle
- * 1, 1, 1, 1, 1, 1, 1 // all the messages for the queue with priority 1 are sent for this cycle
- * 2, 2, 2, 2, 2, // all the messages for the queue with priority 2 are sent for this cycle
- * 3, 3 // all the messages with priority 2 are sent for this cycle</p>
+ * <p>Here is how messages are picked in a single cycle </p>
+ * <p> 1, 1, 1, 1, 1, 1, 1 all the messages for the queue with priority 1 are sent for this cycle
+ * 2, 2, 2, 2, 2, all the messages for the queue with priority 2 are sent for this cycle
+ * 3, 3 all the messages with priority 2 are sent for this cycle</p>
*
* <p>This algorithm choose the queues in the above order if all the queues have messages at the
* point of selection. If a queue doesn't have messages it will skip the queue and move to the
@@ -37,25 +37,16 @@ import java.util.List;
*/
public class PRRNextQueueAlgorithm<E> implements NextQueueAlgorithm<E> {
- /**
- * We hold a reference to the actual queue
- */
+ /** Reference to the actual queue */
private List<InternalQueue<E>> queues;
- /**
- * Number of queues, this is just to avoid the overhead of calculating
- * this again and again
- */
+ /** Number of queues, we keep this to avoid the overhead of calculatin this again and again */
private int size = 0;
- /**
- * Current queue we are operating on
- */
+ /** Current queue we are operating on */
private int currentQueue = 0;
- /**
- * Number of messages sent from the current queue
- */
+ /** Number of messages sent from the current queue */
private int currentCount = 0;
public InternalQueue<E> getNextQueue() {
@@ -80,6 +71,7 @@ public class PRRNextQueueAlgorithm<E> im
// we move forward until we find a non empty queue or everything is empty
} while (internalQueue.size() == 0 && c < size);
+ // if we come to the initial queue, that means all the queues are empty.
if (internalQueue.size() == 0) {
currentQueue = 0;
return null;
@@ -88,9 +80,6 @@ public class PRRNextQueueAlgorithm<E> im
currentCount++;
- /*log.info("Get the queue with the priority: " +
- internalQueue.getPriority());*/
-
return internalQueue;
}
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/PriorityExecutor.java Sat Mar 13 06:22:14 2010
@@ -47,6 +47,7 @@ public class PriorityExecutor {
private int max = ExecutorConstants.DEFAULT_MAX;
/** Keep alive time for spare threads */
private int keepAlive = ExecutorConstants.DEFAULT_KEEP_ALIVE;
+
/** This will be executed before the Task is submitted */
private BeforeExecuteHandler beforeExecuteHandler;
/** Queue used by the executor */
@@ -56,12 +57,14 @@ public class PriorityExecutor {
private String fileName;
/**
- * Execute a given task with the priority specified.
+ * Execute a given task with the priority specified. If the task throws an exception,
+ * it will be captured and logged to prevent the threads from dying.
*
* @param task task to be executed
* @param priority priority of the tast
*/
public void execute(final Runnable task, int priority) {
+ // create a dummy worker to execute the task
Worker w = new Worker(task, priority);
if (beforeExecuteHandler != null) {
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/FixedSizeQueue.java Sat Mar 13 06:22:14 2010
@@ -168,6 +168,14 @@ public class FixedSizeQueue<E> extends A
return capacity;
}
+ @Override
+ public boolean contains(Object o) {
+ for (E e : array) {
+ if (e.equals(o)) return true;
+ }
+ return false;
+ }
+
private int increment(int i) {
return (++i == array.length)? 0 : i;
}
Modified: synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java?rev=922486&r1=922485&r2=922486&view=diff
==============================================================================
--- synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java (original)
+++ synapse/trunk/java/modules/commons/src/main/java/org/apache/synapse/commons/executors/queues/UnboundedQueue.java Sat Mar 13 06:22:14 2010
@@ -59,11 +59,17 @@ public class UnboundedQueue<E> extends A
}
public E poll() {
- return elements.remove(elements.size() - 1);
+ if (elements.size() > 0) {
+ return elements.remove(elements.size() - 1);
+ }
+ return null;
}
public E peek() {
- return elements.get(elements.size() - 1);
+ if (elements.size() > 0) {
+ return elements.get(elements.size() - 1);
+ }
+ return null;
}
public int getPriority() {
@@ -105,4 +111,9 @@ public class UnboundedQueue<E> extends A
public int getCapacity() {
return Integer.MAX_VALUE;
}
+
+ @Override
+ public boolean contains(Object o) {
+ return elements.contains(o);
+ }
}