You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/30 16:28:29 UTC
svn commit: r1487874 -
/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
Author: tabish
Date: Thu May 30 14:28:29 2013
New Revision: 1487874
URL: http://svn.apache.org/r1487874
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4562
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=1487874&r1=1487873&r2=1487874&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java Thu May 30 14:28:29 2013
@@ -19,6 +19,7 @@ package org.apache.activemq;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+
import org.apache.activemq.command.MessageDispatch;
public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
@@ -29,6 +30,7 @@ public class SimplePriorityMessageDispat
private boolean running;
private int size = 0;
+ @SuppressWarnings("unchecked")
public SimplePriorityMessageDispatchChannel() {
this.lists = new LinkedList[MAX_PRIORITY];
for (int i = 0; i < MAX_PRIORITY; i++) {
@@ -38,14 +40,13 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
- * @see
- * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
- * .command.MessageDispatch)
+ *
+ * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
*/
+ @Override
public void enqueue(MessageDispatch message) {
synchronized (mutex) {
getList(message).addLast(message);
-
this.size++;
mutex.notify();
}
@@ -53,10 +54,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
- * @see
- * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
- * .command.MessageDispatch)
+ *
+ * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
*/
+ @Override
public void enqueueFirst(MessageDispatch message) {
synchronized (mutex) {
getList(message).addFirst(message);
@@ -67,18 +68,20 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
*/
+ @Override
public boolean isEmpty() {
- // synchronized (mutex) {
return this.size == 0;
- // }
}
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
*/
+ @Override
public MessageDispatch dequeue(long timeout) throws InterruptedException {
synchronized (mutex) {
// Wait until the consumer is ready to deliver messages.
@@ -99,8 +102,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
*/
+ @Override
public MessageDispatch dequeueNoWait() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
@@ -112,8 +117,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#peek()
*/
+ @Override
public MessageDispatch peek() {
synchronized (mutex) {
if (closed || !running || isEmpty()) {
@@ -125,8 +132,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#start()
*/
+ @Override
public void start() {
synchronized (mutex) {
running = true;
@@ -136,8 +145,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#stop()
*/
+ @Override
public void stop() {
synchronized (mutex) {
running = false;
@@ -147,8 +158,10 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#close()
*/
+ @Override
public void close() {
synchronized (mutex) {
if (!closed) {
@@ -161,28 +174,35 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#clear()
*/
+ @Override
public void clear() {
synchronized (mutex) {
for (int i = 0; i < MAX_PRIORITY; i++) {
lists[i].clear();
}
+ this.size = 0;
}
}
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#isClosed()
*/
+ @Override
public boolean isClosed() {
return closed;
}
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#size()
*/
+ @Override
public int size() {
synchronized (mutex) {
return this.size;
@@ -191,26 +211,31 @@ public class SimplePriorityMessageDispat
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#getMutex()
*/
+ @Override
public Object getMutex() {
return mutex;
}
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#isRunning()
*/
+ @Override
public boolean isRunning() {
return running;
}
/*
* (non-Javadoc)
+ *
* @see org.apache.activemq.MessageDispatchChannelI#removeAll()
*/
+ @Override
public List<MessageDispatch> removeAll() {
-
synchronized (mutex) {
ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
@@ -225,20 +250,18 @@ public class SimplePriorityMessageDispat
@Override
public String toString() {
-
String result = "";
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
result += i + ":{" + lists[i].toString() + "}";
}
return result;
-
}
protected int getPriority(MessageDispatch message) {
int priority = javax.jms.Message.DEFAULT_PRIORITY;
if (message.getMessage() != null) {
- priority = Math.max(message.getMessage().getPriority(), 0);
- priority = Math.min(priority, 9);
+ priority = Math.max(message.getMessage().getPriority(), 0);
+ priority = Math.min(priority, 9);
}
return priority;
}