You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [4/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Oct 20 16:23:01 2009
@@ -19,13 +19,11 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
@@ -33,7 +31,7 @@
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
@@ -61,62 +59,40 @@
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
- private int unused;
-
- private PrincipalHolder _prinicpalHolder;
-
-
- private Object _exclusiveOwner;
-
- private Exchange _alternateExchange;
-
-
- static final class QueueContext implements Context
- {
- volatile QueueEntry _lastSeenEntry;
- volatile QueueEntry _releasedEntry;
-
- public QueueContext(QueueEntry head)
- {
- _lastSeenEntry = head;
- }
-
- public QueueEntry getLastSeenEntry()
- {
- return _lastSeenEntry;
- }
- }
-
-
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
- _lastSeenUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
-
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
- _releasedUpdater =
- AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntry.class, "_releasedEntry");
+ private final VirtualHost _virtualHost;
private final AMQShortString _name;
+ private final String _resourceName;
/** null means shared */
private final AMQShortString _owner;
+ private PrincipalHolder _prinicpalHolder;
+
+ private Object _exclusiveOwner;
+
+
private final boolean _durable;
/** If true, this queue is deleted when the last subscriber is removed */
private final boolean _autoDelete;
- private final VirtualHost _virtualHost;
+ private Exchange _alternateExchange;
/** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
private final ExchangeBindings _bindings = new ExchangeBindings(this);
- private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ protected final QueueEntryList _entries;
+
+ protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+ private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+
+ private volatile Subscription _exclusiveSubscriber;
+
+
private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
@@ -124,18 +100,10 @@
private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
- protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
- private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+ private final AtomicLong _totalMessagesReceived = new AtomicLong();
- private volatile Subscription _exclusiveSubscriber;
- protected final QueueEntryList _entries;
- private final AMQQueueMBean _managedObject;
- private final Executor _asyncDelivery;
- private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
- private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -152,23 +120,37 @@
/** the minimum interval between sending out consecutive alerts of the same type */
public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
- private static final int MAX_ASYNC_DELIVERIES = 10;
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
+
+ static final int MAX_ASYNC_DELIVERIES = 10;
+
+
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
+ private final Executor _asyncDelivery;
private AtomicInteger _deliveredMessages = new AtomicInteger();
private AtomicBoolean _stopped = new AtomicBoolean(false);
+
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+ private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
+
private LogSubject _logSubject;
private LogActor _logActor;
+ private AMQQueueMBean _managedObject;
+ private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
+ private boolean _nolocal;
- private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
- private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
- throws AMQException
{
this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
}
@@ -179,7 +161,6 @@
boolean autoDelete,
VirtualHost virtualHost,
QueueEntryListFactory entryListFactory)
- throws AMQException
{
if (name == null)
@@ -193,6 +174,7 @@
}
_name = name;
+ _resourceName = String.valueOf(name);
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
@@ -231,7 +213,7 @@
}
catch (JMException e)
{
- throw new AMQException("AMQQueue MBean creation has failed ", e);
+ _logger.error("AMQQueue MBean creation has failed ", e);
}
resetNotifications();
@@ -255,11 +237,21 @@
// ------ Getters and Setters
+ public void execute(ReadWriteRunnable runnable)
+ {
+ _asyncDelivery.execute(runnable);
+ }
+
public AMQShortString getName()
{
return _name;
}
+ public void setNoLocal(boolean nolocal)
+ {
+ _nolocal = nolocal;
+ }
+
public boolean isDurable()
{
return _durable;
@@ -401,6 +393,7 @@
if (!isDeleted())
{
subscription.setQueue(this, exclusive);
+ subscription.setNoLocal(_nolocal);
_subscriptionList.add(subscription);
if (isDeleted())
{
@@ -540,7 +533,10 @@
deliverAsync();
}
- _managedObject.checkForNotification(entry.getMessage());
+ if(_managedObject != null)
+ {
+ _managedObject.checkForNotification(entry.getMessage());
+ }
return entry;
}
@@ -612,10 +608,10 @@
QueueContext subContext = (QueueContext) sub.getQueueContext();
QueueEntry releasedEntry = subContext._releasedEntry;
- _lastSeenUpdater.set(subContext, entry);
+ QueueContext._lastSeenUpdater.set(subContext, entry);
if(releasedEntry == entry)
{
- _releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
}
}
@@ -629,7 +625,7 @@
while((oldEntry = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
{
- if(_releasedUpdater.compareAndSet(subContext, oldEntry, entry))
+ if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
{
break;
}
@@ -939,11 +935,11 @@
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
- StoreContext storeContext)
+ ServerTransaction txn)
{
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- TransactionLog txnLog = getVirtualHost().getTransactionLog();
+ final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -962,62 +958,48 @@
}
});
- try
+
+
+ // Move the messages in on the message store.
+ for (final QueueEntry entry : entries)
{
- txnLog.beginTran(storeContext);
+ final ServerMessage message = entry.getMessage();
+ txn.enqueue(toQueue, message,
+ new ServerTransaction.Action()
+ {
- // Move the messages in on the message store.
- for (QueueEntry entry : entries)
- {
- ServerMessage message = entry.getMessage();
+ public void postCommit()
+ {
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
- if (message.isPersistent() && toQueue.isDurable())
- {
- txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
- }
- // dequeue does not decrement the refence count
- entry.dequeue();
- }
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ txn.dequeue(this, message,
+ new ServerTransaction.Action()
+ {
- // Commit and flush the move transcations.
- try
- {
- txnLog.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- txnLog.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
+ public void postCommit()
+ {
+ entry.discard();
+ }
- try
- {
+ public void onRollback()
+ {
+
+ }
+ });
- for (QueueEntry entry : entries)
- {
- toQueue.enqueue(entry.getMessage());
- entry.delete();
- }
- }
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
}
}
@@ -1025,10 +1007,9 @@
public void copyMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
- final StoreContext storeContext)
+ final ServerTransaction txn)
{
- AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- TransactionLog txnLog = getVirtualHost().getTransactionLog();
+ final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1046,66 +1027,37 @@
}
});
- try
+
+ // Move the messages in on the message store.
+ for (QueueEntry entry : entries)
{
- txnLog.beginTran(storeContext);
+ final ServerMessage message = entry.getMessage();
- // Move the messages in on the message store.
- for (QueueEntry entry : entries)
+ if (message.isPersistent() && toQueue.isDurable())
{
- ServerMessage message = entry.getMessage();
- if (message.isPersistent() && toQueue.isDurable())
- {
-
- txnLog.enqueueMessage(storeContext, toQueue, message.getMessageNumber());
-
- }
- }
+ txn.enqueue(toQueue, message, new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ try
+ {
+ toQueue.enqueue(message);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
- // Commit and flush the move transcations.
- try
- {
- txnLog.commitTran(storeContext);
- }
- catch (AMQException e)
- {
- throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
- }
- }
- catch (AMQException e)
- {
- try
- {
- txnLog.abortTran(storeContext);
- }
- catch (AMQException rollbackEx)
- {
- _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
- }
- throw new RuntimeException(e);
- }
+ public void onRollback()
+ {
- try
- {
- for (QueueEntry entry : entries)
- {
+ }
+ });
- ServerMessage message = entry.getMessage();
- if (message != null)
- {
- toQueue.enqueue(entry.getMessage());
- }
}
}
- catch (MessageCleanupException e)
- {
- throw new RuntimeException(e);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
}
@@ -1160,7 +1112,7 @@
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
- Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
while (queueListIterator.advance())
{
@@ -1181,14 +1133,14 @@
private void dequeueEntry(final QueueEntry node)
{
- Transaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
dequeueEntry(node, txn);
}
- private void dequeueEntry(final QueueEntry node, Transaction txn)
+ private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
txn.dequeue(this, node.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -1241,7 +1193,7 @@
}
});
- Transaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+ ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
if(_alternateExchange != null)
{
@@ -1255,7 +1207,7 @@
if(rerouteQueues != null & rerouteQueues.size() != 0)
{
txn.enqueue(rerouteQueues, entry.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -1280,7 +1232,7 @@
}
});
txn.dequeue(this, entry.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -1308,7 +1260,7 @@
if(message != null)
{
txn.dequeue(this, message,
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
@@ -1327,7 +1279,10 @@
txn.commit();
- _managedObject.unregister();
+ if(_managedObject!=null)
+ {
+ _managedObject.unregister();
+ }
for (Task task : _deleteTaskList)
{
@@ -1417,7 +1372,13 @@
public void deliverAsync(Subscription sub)
{
- _asyncDelivery.execute(new SubFlushRunner(sub));
+ SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+ if(flusher == null)
+ {
+ flusher = new SubFlushRunner(sub);
+ sub.set(SUB_FLUSH_RUNNER, flusher);
+ }
+ _asyncDelivery.execute(flusher);
}
@@ -1466,66 +1427,12 @@
}
}
- private class SubFlushRunner implements ReadWriteRunnable
- {
- private final Subscription _sub;
-
- public SubFlushRunner(Subscription sub)
- {
- _sub = sub;
- }
-
- public void run()
- {
-
- String originalName = Thread.currentThread().getName();
- try{
- Thread.currentThread().setName("SubFlushRunner-"+_sub);
-
- boolean complete = false;
- try
- {
- CurrentActor.set(_sub.getLogActor());
- complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
-
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- }
- if (!complete && !_sub.isSuspended())
- {
- _asyncDelivery.execute(this);
- }
- }
- finally
- {
- Thread.currentThread().setName(originalName);
- }
-
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
- }
-
public void flushSubscription(Subscription sub) throws AMQException
{
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
+ public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
{
boolean atTail = false;
@@ -1655,9 +1562,9 @@
}
}
- if(_lastSeenUpdater.compareAndSet(context, lastSeen, node))
+ if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
{
- _releasedUpdater.compareAndSet(context, releasedNode, null);
+ QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
}
lastSeen = context._lastSeenEntry;
@@ -1774,7 +1681,10 @@
}
else
{
- _managedObject.checkForNotification(node.getMessage());
+ if(_managedObject!=null)
+ {
+ _managedObject.checkForNotification(node.getMessage());
+ }
}
}
@@ -1969,4 +1879,9 @@
_flowResumeCapacity = config.getFlowResumeCapacity();
}
}
+
+ public String getResourceName()
+ {
+ return _resourceName;
+ }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Tue Oct 20 16:23:01 2009
@@ -4,6 +4,7 @@
import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
/*
*
@@ -44,9 +45,7 @@
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
+ private AtomicLong _deletes = new AtomicLong(0L);
public SimpleQueueEntryList(AMQQueue queue)
@@ -56,21 +55,77 @@
_tail = _head;
}
+
+
void advanceHead()
{
+ _deletes.incrementAndGet();
QueueEntryImpl head = _head.nextNode();
+ boolean deleted = head.isDeleted();
while(head._next != null && head.isDeleted())
{
+ deleted = true;
final QueueEntryImpl newhead = head.nextNode();
if(newhead != null)
{
- _nextUpdater.compareAndSet(_head,head, newhead);
+ if(_nextUpdater.compareAndSet(_head,head, newhead))
+ {
+ _deletes.decrementAndGet();
+ }
}
head = _head.nextNode();
}
+
+ if(!deleted)
+ {
+ deleted = true;
+ }
+
+ if(_deletes.get() > 1000L)
+ {
+ _deletes.set(0L);
+ scavenge();
+ }
}
+ void scavenge()
+ {
+ QueueEntryImpl root = _head;
+ QueueEntryImpl next = root.nextNode();
+
+ do
+ {
+
+
+ while(next._next != null && next.isDeleted())
+ {
+
+ final QueueEntryImpl newhead = next.nextNode();
+ if(newhead != null)
+ {
+ _nextUpdater.compareAndSet(root,next, newhead);
+ }
+ next = root.nextNode();
+ }
+ if(next._next != null)
+ {
+ if(!next.isDeleted())
+ {
+ root = next;
+ next = root.nextNode();
+ }
+ }
+ else
+ {
+ break;
+ }
+
+ } while (next != null && next._next != null);
+
+ }
+
+
public AMQQueue getQueue()
{
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,68 @@
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.pool.ReadWriteRunnable;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+
+class SubFlushRunner implements ReadWriteRunnable
+{
+ private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
+
+ private final Subscription _sub;
+ private final String _name;
+ private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+
+ public SubFlushRunner(Subscription sub)
+ {
+ _sub = sub;
+ _name = "SubFlushRunner-"+_sub;
+ }
+
+ public void run()
+ {
+
+
+ Thread.currentThread().setName(_name);
+
+ boolean complete = false;
+ try
+ {
+ CurrentActor.set(_sub.getLogActor());
+ complete = getQueue().flushSubscription(_sub, ITERATIONS);
+
+ }
+ catch (AMQException e)
+ {
+ _logger.error(e);
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ if (!complete && !_sub.isSuspended())
+ {
+ getQueue().execute(this);
+ }
+
+
+ }
+
+ private SimpleAMQQueue getQueue()
+ {
+ return (SimpleAMQQueue) _sub.getQueue();
+ }
+
+ public boolean isRead()
+ {
+ return false;
+ }
+
+ public boolean isWrite()
+ {
+ return true;
+ }
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Tue Oct 20 16:23:01 2009
@@ -35,8 +35,8 @@
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import java.io.File;
@@ -51,15 +51,15 @@
public void initialise(int instanceID) throws Exception
{
- _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
+ _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
new Log4jMessageLogger());
-
+
_registryName = String.valueOf(instanceID);
// Set the Actor for current log messages
CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
- CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
+ CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
initialiseManagedObjectRegistry();
@@ -68,7 +68,7 @@
_pluginManager = new PluginManager(_configuration.getPluginDirectory());
_accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager);
-
+
_databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
@@ -99,10 +99,10 @@
}
private void initialiseVirtualHosts() throws Exception
- {
+ {
for (String name : _configuration.getVirtualHosts())
{
- _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name)));
+ _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name)));
}
getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
}
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java&r1=821930&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java Tue Oct 20 16:23:01 2009
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server;
+package org.apache.qpid.server.security;
import java.security.Principal;
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java Tue Oct 20 16:23:01 2009
@@ -14,9 +14,9 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.access;
@@ -32,16 +32,12 @@
import org.apache.log4j.Logger;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.SecurityConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
public class ACLManager
{
@@ -79,7 +75,7 @@
{
_hostPlugins = configurePlugins(hostConfig);
}
-
+
public Map<String, ACLPlugin> configurePlugins(SecurityConfiguration hostConfig) throws ConfigurationException
{
Configuration securityConfig = hostConfig.getConfiguration();
@@ -109,7 +105,7 @@
}
}
return plugins;
- }
+ }
public static Logger getLogger()
{
@@ -132,18 +128,18 @@
if (result == AuthzResult.DENIED)
{
// Something vetoed the access, we're done
- return false;
+ return false;
}
else if (result == AuthzResult.ALLOWED)
{
- // Remove plugin from global check list since
+ // Remove plugin from global check list since
// host allow overrides global allow
remainingPlugins.remove(plugin.getKey());
}
}
-
+
for (ACLPlugin plugin : remainingPlugins.values())
- {
+ {
result = checker.allowed(plugin);
if (result == AuthzResult.DENIED)
{
@@ -271,7 +267,7 @@
});
}
-
+
public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory,
final AMQShortString routingKey, final Exchange e)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -24,10 +24,9 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
public interface ACLPlugin
{
@@ -35,13 +34,13 @@
{
ALLOWED,
DENIED,
- ABSTAIN
+ ABSTAIN
}
void setConfiguration(Configuration config) throws ConfigurationException;
- // These return true if the plugin thinks the action should be allowed, and false if not.
-
+ // These return true if the plugin thinks the action should be allowed, and false if not.
+
AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -22,14 +22,13 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
+import org.apache.qpid.server.security.PrincipalHolder;
/**
- * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations.
+ * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations.
*/
public abstract class AbstractACLPlugin implements ACLPlugin
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java Tue Oct 20 16:23:01 2009
@@ -21,21 +21,19 @@
package org.apache.qpid.server.security.access.plugins;
import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
public abstract class BasicACLPlugin implements ACLPlugin
{
- // Returns true or false if the plugin should authorise or deny the request
+ // Returns true or false if the plugin should authorise or deny the request
protected abstract AuthzResult getResult();
-
+
public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch,
AMQQueue queue, AMQShortString routingKey)
{
@@ -51,7 +49,7 @@
public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck,
AMQQueue queue)
{
- return getResult();
+ return getResult();
}
public AuthzResult authoriseConsume(PrincipalHolder session,
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java Tue Oct 20 16:23:01 2009
@@ -25,15 +25,14 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
import org.apache.qpid.server.security.access.AccessResult;
import org.apache.qpid.server.security.access.Permission;
import org.apache.qpid.server.security.access.PrincipalPermissions;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,7 +56,7 @@
return plugin;
}
};
-
+
private Map<String, PrincipalPermissions> _users;
private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED);
@@ -82,7 +81,7 @@
/**
* Publish format takes Exchange + Routing Key Pairs
- *
+ *
* @param config
* XML Configuration
*/
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java Tue Oct 20 16:23:01 2009
@@ -32,12 +32,11 @@
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.ACLPluginFactory;
import org.apache.qpid.server.security.access.plugins.AbstractACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.PrincipalHolder;
import org.apache.qpid.util.NetMatcher;
public class FirewallPlugin extends AbstractACLPlugin
@@ -59,7 +58,7 @@
return plugin;
}
};
-
+
public class FirewallRule
{
@@ -71,13 +70,13 @@
public FirewallRule(String access, List networks, List hostnames)
{
_access = (access.equals("allow")) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
-
+
if (networks != null && networks.size() > 0)
{
String[] networkStrings = objListToStringArray(networks);
_network = new NetMatcher(networkStrings);
}
-
+
if (hostnames != null && hostnames.size() > 0)
{
int i = 0;
@@ -87,7 +86,7 @@
_hostnamePatterns[i++] = Pattern.compile(hostname);
}
}
-
+
}
private String[] objListToStringArray(List objList)
@@ -149,7 +148,7 @@
thread.run();
long endTime = System.currentTimeMillis() + DNS_TIMEOUT;
-
+
while (System.currentTimeMillis() < endTime && !done.get())
{
try
@@ -183,7 +182,7 @@
if(!(principalHolder instanceof ProtocolEngine))
{
return AuthzResult.ABSTAIN; // We only deal with tcp sessions
- }
+ }
ProtocolEngine session = (ProtocolEngine) principalHolder;
@@ -237,7 +236,7 @@
_default = AuthzResult.DENIED;
}
CompositeConfiguration finalConfig = new CompositeConfiguration(config);
-
+
List subFiles = config.getList("xml[@fileName]");
for (Object subFile : subFiles)
{
@@ -245,7 +244,7 @@
}
// all rules must have an access attribute
- int numRules = finalConfig.getList("rule[@access]").size();
+ int numRules = finalConfig.getList("rule[@access]").size();
_rules = new FirewallRule[numRules];
for (int i = 0; i < numRules; i++)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java Tue Oct 20 16:23:01 2009
@@ -215,14 +215,7 @@
_logger.warn("Unable to load access file:" + jmxaccesssFile);
}
- try
- {
- _mbean.register();
- }
- catch (AMQException e)
- {
- _logger.warn("Unable to register user management MBean");
- }
+ _mbean.register();
}
catch (JMException e)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java Tue Oct 20 16:23:01 2009
@@ -31,12 +31,12 @@
{
protected LogSubject _logSubject;
- public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration hostConfig) throws Exception
+ public void configure(VirtualHost virtualHost) throws Exception
{
_logSubject = new MessageStoreLogSubject(virtualHost, this);
CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1001(this.getClass().getName()));
}
-
+
public void close() throws Exception
{
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1003());
Added: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=827724&view=auto
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (added)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Tue Oct 20 16:23:01 2009
@@ -0,0 +1,57 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.framing.FieldTable;
+
+public interface ConfigurationRecoveryHandler
+{
+ QueueRecoveryHandler begin(MessageStore store);
+
+ public static interface QueueRecoveryHandler
+ {
+ void queue(String queueName, String owner, FieldTable arguments);
+ ExchangeRecoveryHandler completeQueueRecovery();
+ }
+
+ public static interface ExchangeRecoveryHandler
+ {
+ void exchange(String exchangeName, String type, boolean autoDelete);
+ BindingRecoveryHandler completeExchangeRecovery();
+ }
+
+ public static interface BindingRecoveryHandler
+ {
+ void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
+ void completeBindingRecovery();
+ }
+
+ public static interface QueueEntryRecoveryHandler
+ {
+ void complete();
+
+ void queueEntry(String queueName, long messageId);
+ }
+
+
+
+}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org