You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/06 11:23:56 UTC

svn commit: r801557 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Author: ritchiem
Date: Thu Aug  6 09:23:55 2009
New Revision: 801557

URL: http://svn.apache.org/viewvc?rev=801557&view=rev
Log:
QPID-2002 : Add Queue Creation logging

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=801557&r1=801556&r2=801557&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Aug  6 09:23:55 2009
@@ -29,6 +29,10 @@
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.QueueMessages;
 
 /*
 *
@@ -113,6 +117,7 @@
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
+    private LogSubject _logSubject;
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -148,6 +153,28 @@
 
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
+        _logSubject = new QueueLogSubject(this);
+
+        // Log the correct creation message
+
+        // Extract the number of priorities for this Queue.
+        // Leave it as 0 if we are a SimpleQueueEntryList
+        int priorities = 0;
+        if (entryListFactory instanceof PriorityQueueList)
+        {
+            PriorityQueueList priorityFactory = (PriorityQueueList) entryListFactory;
+            priorities = priorityFactory.getPriorities();
+        }
+
+        // Log the creation of this Queue.
+        // The priorities display is toggled on if we set priorities > 0
+        CurrentActor.get().message(_logSubject,
+                                   QueueMessages.QUE_1001(String.valueOf(_owner),
+                                                          priorities,
+                                                          autoDelete,
+                                                          durable, !durable,
+                                                          priorities > 0));
+
         try
         {
             _managedObject = new AMQQueueMBean(this);
@@ -429,7 +456,7 @@
         }
 
         _managedObject.checkForNotification(entry.getMessage());
-        
+
         return entry;
     }
 
@@ -823,11 +850,11 @@
         return entryList;
 
     }
-    
+
     /**
      * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
-     * 
-     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. 
+     *
+     * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
      * Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
      * @param fromPosition
      * @param toPosition
@@ -836,7 +863,7 @@
     public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
     {
         List<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-        
+
         QueueEntryIterator it = _entries.iterator();
 
         long index = 1;
@@ -844,20 +871,20 @@
         {
             it.advance();
         }
-        
+
         if(index < fromPosition)
         {
             //The queue does not contain enough entries to reach our range.
             //return the empty list.
             return queueEntries;
         }
-        
+
         for ( ; index <= toPosition && !it.atTail(); index++)
         {
             it.advance();
             queueEntries.add(it.getNode());
         }
-        
+
         return queueEntries;
     }
 
@@ -1244,7 +1271,7 @@
 
         while (!sub.isSuspended() && !atTail && iterations != 0)
         {
-            try 
+            try
             {
                 sub.getSendLock();
                 atTail =  attemptDelivery(sub);
@@ -1479,8 +1506,8 @@
             if (!node.isDeleted() && node.expired() && node.acquire())
             {
                 node.discard(storeContext);
-            } 
-            else 
+            }
+            else
             {
                 _managedObject.checkForNotification(node.getMessage());
             }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org