You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/08/06 11:23:56 UTC
svn commit: r801557 -
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Author: ritchiem
Date: Thu Aug 6 09:23:55 2009
New Revision: 801557
URL: http://svn.apache.org/viewvc?rev=801557&view=rev
Log:
QPID-2002 : Add Queue Creation logging
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=801557&r1=801556&r2=801557&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Aug 6 09:23:55 2009
@@ -29,6 +29,10 @@
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.messages.QueueMessages;
/*
*
@@ -113,6 +117,7 @@
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
+ private LogSubject _logSubject;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
@@ -148,6 +153,28 @@
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
+ _logSubject = new QueueLogSubject(this);
+
+ // Log the correct creation message
+
+ // Extract the number of priorities for this Queue.
+ // Leave it as 0 if we are a SimpleQueueEntryList
+ int priorities = 0;
+ if (entryListFactory instanceof PriorityQueueList)
+ {
+ PriorityQueueList priorityFactory = (PriorityQueueList) entryListFactory;
+ priorities = priorityFactory.getPriorities();
+ }
+
+ // Log the creation of this Queue.
+ // The priorities display is toggled on if we set priorities > 0
+ CurrentActor.get().message(_logSubject,
+ QueueMessages.QUE_1001(String.valueOf(_owner),
+ priorities,
+ autoDelete,
+ durable, !durable,
+ priorities > 0));
+
try
{
_managedObject = new AMQQueueMBean(this);
@@ -429,7 +456,7 @@
}
_managedObject.checkForNotification(entry.getMessage());
-
+
return entry;
}
@@ -823,11 +850,11 @@
return entryList;
}
-
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
- *
- * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
* @param fromPosition
* @param toPosition
@@ -836,7 +863,7 @@
public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition)
{
List<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
-
+
QueueEntryIterator it = _entries.iterator();
long index = 1;
@@ -844,20 +871,20 @@
{
it.advance();
}
-
+
if(index < fromPosition)
{
//The queue does not contain enough entries to reach our range.
//return the empty list.
return queueEntries;
}
-
+
for ( ; index <= toPosition && !it.atTail(); index++)
{
it.advance();
queueEntries.add(it.getNode());
}
-
+
return queueEntries;
}
@@ -1244,7 +1271,7 @@
while (!sub.isSuspended() && !atTail && iterations != 0)
{
- try
+ try
{
sub.getSendLock();
atTail = attemptDelivery(sub);
@@ -1479,8 +1506,8 @@
if (!node.isDeleted() && node.expired() && node.acquire())
{
node.discard(storeContext);
- }
- else
+ }
+ else
{
_managedObject.checkForNotification(node.getMessage());
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org