You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/30 16:28:29 UTC

svn commit: r1487874 - /activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java

Author: tabish
Date: Thu May 30 14:28:29 2013
New Revision: 1487874

URL: http://svn.apache.org/r1487874
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4562

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java?rev=1487874&r1=1487873&r2=1487874&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/SimplePriorityMessageDispatchChannel.java Thu May 30 14:28:29 2013
@@ -19,6 +19,7 @@ package org.apache.activemq;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
+
 import org.apache.activemq.command.MessageDispatch;
 
 public class SimplePriorityMessageDispatchChannel implements MessageDispatchChannel {
@@ -29,6 +30,7 @@ public class SimplePriorityMessageDispat
     private boolean running;
     private int size = 0;
 
+    @SuppressWarnings("unchecked")
     public SimplePriorityMessageDispatchChannel() {
         this.lists = new LinkedList[MAX_PRIORITY];
         for (int i = 0; i < MAX_PRIORITY; i++) {
@@ -38,14 +40,13 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
-     * @see
-     * org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq
-     * .command.MessageDispatch)
+     *
+     * @see org.apache.activemq.MessageDispatchChannelI#enqueue(org.apache.activemq.command.MessageDispatch)
      */
+    @Override
     public void enqueue(MessageDispatch message) {
         synchronized (mutex) {
             getList(message).addLast(message);
-
             this.size++;
             mutex.notify();
         }
@@ -53,10 +54,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
-     * @see
-     * org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq
-     * .command.MessageDispatch)
+     *
+     * @see org.apache.activemq.MessageDispatchChannelI#enqueueFirst(org.apache.activemq.command.MessageDispatch)
      */
+    @Override
     public void enqueueFirst(MessageDispatch message) {
         synchronized (mutex) {
             getList(message).addFirst(message);
@@ -67,18 +68,20 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#isEmpty()
      */
+    @Override
     public boolean isEmpty() {
-        // synchronized (mutex) {
         return this.size == 0;
-        // }
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#dequeue(long)
      */
+    @Override
     public MessageDispatch dequeue(long timeout) throws InterruptedException {
         synchronized (mutex) {
             // Wait until the consumer is ready to deliver messages.
@@ -99,8 +102,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#dequeueNoWait()
      */
+    @Override
     public MessageDispatch dequeueNoWait() {
         synchronized (mutex) {
             if (closed || !running || isEmpty()) {
@@ -112,8 +117,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#peek()
      */
+    @Override
     public MessageDispatch peek() {
         synchronized (mutex) {
             if (closed || !running || isEmpty()) {
@@ -125,8 +132,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#start()
      */
+    @Override
     public void start() {
         synchronized (mutex) {
             running = true;
@@ -136,8 +145,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#stop()
      */
+    @Override
     public void stop() {
         synchronized (mutex) {
             running = false;
@@ -147,8 +158,10 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#close()
      */
+    @Override
     public void close() {
         synchronized (mutex) {
             if (!closed) {
@@ -161,28 +174,35 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#clear()
      */
+    @Override
     public void clear() {
         synchronized (mutex) {
             for (int i = 0; i < MAX_PRIORITY; i++) {
                 lists[i].clear();
             }
+            this.size = 0;
         }
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#isClosed()
      */
+    @Override
     public boolean isClosed() {
         return closed;
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#size()
      */
+    @Override
     public int size() {
         synchronized (mutex) {
             return this.size;
@@ -191,26 +211,31 @@ public class SimplePriorityMessageDispat
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#getMutex()
      */
+    @Override
     public Object getMutex() {
         return mutex;
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#isRunning()
      */
+    @Override
     public boolean isRunning() {
         return running;
     }
 
     /*
      * (non-Javadoc)
+     *
      * @see org.apache.activemq.MessageDispatchChannelI#removeAll()
      */
+    @Override
     public List<MessageDispatch> removeAll() {
-
         synchronized (mutex) {
             ArrayList<MessageDispatch> result = new ArrayList<MessageDispatch>(size());
             for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
@@ -225,20 +250,18 @@ public class SimplePriorityMessageDispat
 
     @Override
     public String toString() {
-
         String result = "";
         for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
             result += i + ":{" + lists[i].toString() + "}";
         }
         return result;
-
     }
 
     protected int getPriority(MessageDispatch message) {
         int priority = javax.jms.Message.DEFAULT_PRIORITY;
         if (message.getMessage() != null) {
-	        priority = Math.max(message.getMessage().getPriority(), 0);
-	        priority = Math.min(priority, 9);
+            priority = Math.max(message.getMessage().getPriority(), 0);
+            priority = Math.min(priority, 9);
         }
         return priority;
     }