You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC
svn commit: r1186990 [23/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Thu Oct 20 18:42:46 2011
@@ -20,25 +20,26 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.registry.IApplicationRegistry;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine, ConnectionConfig
+public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
- private NetworkDriver _networkDriver;
+ private NetworkConnection _network;
private long _readBytes;
private long _writtenBytes;
private ServerConnection _connection;
@@ -47,26 +48,22 @@ public class ProtocolEngine_0_10 extend
private long _createTime = System.currentTimeMillis();
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkDriver networkDriver,
+ NetworkConnection network,
final IApplicationRegistry appRegistry)
{
super(new Assembler(conn));
_connection = conn;
_connection.setConnectionConfig(this);
- _networkDriver = networkDriver;
+
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- // FIXME Two log messages to maintain compatinbility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
- }
+ if(network != null)
+ {
+ setNetworkConnection(network);
+ }
+
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
- _connection.setSender(dis);
_connection.onOpen(new Runnable()
{
public void run()
@@ -77,14 +74,30 @@ public class ProtocolEngine_0_10 extend
}
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ setNetworkConnection(network, network.getSender());
+ }
+
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ _network = network;
+
+ _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
+ }
+
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getReadBytes()
@@ -134,7 +147,7 @@ public class ProtocolEngine_0_10 extend
public String getAuthId()
{
- return _connection.getAuthorizationID();
+ return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
}
public String getRemoteProcessName()
@@ -193,9 +206,14 @@ public class ProtocolEngine_0_10 extend
{
return false;
}
-
+
public void mgmtClose()
{
_connection.mgmtClose();
}
+
+ public long getConnectionId()
+ {
+ return _connection.getConnectionId();
+ }
}
Propchange: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -3,4 +3,5 @@
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1072051-1185907
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Thu Oct 20 18:42:46 2011
@@ -60,7 +60,7 @@ public class AMQPriorityQueue extends Si
{
// check that all subscriptions are not in advance of the entry
SubscriptionList.SubscriptionNodeIterator subIter = _subscriptionList.iterator();
- while(subIter.advance() && !entry.isAcquired())
+ while(subIter.advance() && entry.isAvailable())
{
final Subscription subscription = subIter.getNode().getSubscription();
if(!subscription.isClosed())
@@ -70,7 +70,7 @@ public class AMQPriorityQueue extends Si
{
QueueEntry subnode = context._lastSeenEntry;
QueueEntry released = context._releasedEntry;
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ while(subnode != null && entry.compareTo(subnode) < 0 && entry.isAvailable() && (released == null || released.compareTo(entry) < 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
{
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Oct 20 18:42:46 2011
@@ -21,21 +21,18 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.configuration.QueueConfig;
-import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -72,8 +69,8 @@ public interface AMQQueue extends Managa
boolean isAutoDelete();
AMQShortString getOwner();
- PrincipalHolder getPrincipalHolder();
- void setPrincipalHolder(PrincipalHolder principalHolder);
+ AuthorizationHolder getAuthorizationHolder();
+ void setAuthorizationHolder(AuthorizationHolder principalHolder);
void setExclusiveOwningSession(AMQSessionModel owner);
AMQSessionModel getExclusiveOwningSession();
@@ -108,23 +105,16 @@ public interface AMQQueue extends Managa
boolean isDeleted();
-
int delete() throws AMQException;
-
void requeue(QueueEntry entry);
- void requeue(QueueEntryImpl storeContext, Subscription subscription);
-
void dequeue(QueueEntry entry, Subscription sub);
void decrementUnackedMsgCount();
-
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
-
void addQueueDeleteTask(final Task task);
void removeQueueDeleteTask(final Task task);
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Thu Oct 20 18:42:46 2011
@@ -43,6 +43,7 @@ import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
+import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.monitor.MonitorNotification;
import javax.management.openmbean.ArrayType;
@@ -97,7 +98,7 @@ public class AMQQueueMBean extends AMQMa
{
super(ManagedQueue.class, ManagedQueue.TYPE);
_queue = queue;
- _queueName = jmxEncode(new StringBuffer(queue.getNameShortString()), 0).toString();
+ _queueName = queue.getName();
}
public ManagedObject getParentObject()
@@ -147,7 +148,7 @@ public class AMQQueueMBean extends AMQMa
public String getObjectInstanceName()
{
- return _queueName;
+ return ObjectName.quote(_queueName);
}
public String getName()
@@ -506,7 +507,7 @@ public class AMQQueueMBean extends AMQMa
private String[] getMessageHeaderProperties(ContentHeaderBody headerBody)
{
List<String> list = new ArrayList<String>();
- BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.properties;
+ BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) headerBody.getProperties();
list.add("reply-to = " + headerProperties.getReplyToAsString());
list.add("propertyFlags = " + headerProperties.getPropertyFlags());
list.add("ApplicationID = " + headerProperties.getAppIdAsString());
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Thu Oct 20 18:42:46 2011
@@ -96,9 +96,9 @@ public class IncomingMessage implements
public void setExpiration()
{
long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getExpiration();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.properties).getTimestamp();
+ ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
if (SYNCHED_CLOCKS)
{
@@ -139,7 +139,7 @@ public class IncomingMessage implements
public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
- _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
_bodyLengthReceived += contentChunk.getSize();
_contentChunks.add(contentChunk);
@@ -193,8 +193,8 @@ public class IncomingMessage implements
public boolean isPersistent()
{
- return getContentHeader().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
+ return getContentHeader().getProperties() instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().getProperties()).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
@@ -263,7 +263,7 @@ public class IncomingMessage implements
int written = 0;
for(ContentChunk cb : _contentChunks)
{
- ByteBuffer data = cb.getData().buf();
+ ByteBuffer data = ByteBuffer.wrap(cb.getData());
if(offset+written >= pos && offset < pos + data.limit())
{
ByteBuffer src = data.duplicate();
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Thu Oct 20 18:42:46 2011
@@ -52,6 +52,17 @@ public interface QueueEntry extends Comp
}
public abstract State getState();
+
+ /**
+ * Returns true if state is either DEQUEUED or DELETED.
+ *
+ * @return true if state is either DEQUEUED or DELETED.
+ */
+ public boolean isDispensed()
+ {
+ State currentState = getState();
+ return currentState == State.DEQUEUED || currentState == State.DELETED;
+ }
}
@@ -191,11 +202,7 @@ public interface QueueEntry extends Comp
void reject();
- void reject(Subscription subscription);
-
- boolean isRejectedBy(Subscription subscription);
-
- void requeue(Subscription subscription);
+ boolean isRejectedBy(long subscriptionId);
void dequeue();
@@ -209,4 +216,18 @@ public interface QueueEntry extends Comp
void addStateChangeListener(StateChangeListener listener);
boolean removeStateChangeListener(StateChangeListener listener);
+
+ /**
+ * Returns true if entry is in DEQUEUED state, otherwise returns false.
+ *
+ * @return true if entry is in DEQUEUED state, otherwise returns false
+ */
+ boolean isDequeued();
+
+ /**
+ * Returns true if entry is either DEQUED or DELETED state.
+ *
+ * @return true if entry is either DEQUED or DELETED state
+ */
+ boolean isDispensed();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Oct 20 18:42:46 2011
@@ -51,7 +51,7 @@ public class QueueEntryImpl implements Q
private MessageReference _message;
- private Set<Subscription> _rejectedBy = null;
+ private Set<Long> _rejectedBy = null;
private volatile EntryState _state = AVAILABLE_STATE;
@@ -325,19 +325,16 @@ public class QueueEntryImpl implements Q
public void reject()
{
- reject(getDeliveredSubscription());
- }
+ Subscription subscription = getDeliveredSubscription();
- public void reject(Subscription subscription)
- {
if (subscription != null)
{
if (_rejectedBy == null)
{
- _rejectedBy = new HashSet<Subscription>();
+ _rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription);
+ _rejectedBy.add(subscription.getSubscriptionID());
}
else
{
@@ -345,12 +342,12 @@ public class QueueEntryImpl implements Q
}
}
- public boolean isRejectedBy(Subscription subscription)
+ public boolean isRejectedBy(long subscriptionId)
{
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
- return _rejectedBy.contains(subscription);
+ return _rejectedBy.contains(subscriptionId);
}
else // This messasge hasn't been rejected yet.
{
@@ -358,15 +355,6 @@ public class QueueEntryImpl implements Q
}
}
- public void requeue(Subscription subscription)
- {
- getQueue().requeue(this, subscription);
- if(_stateChangeListeners != null)
- {
- notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
- }
- }
-
public void dequeue()
{
EntryState state = _state;
@@ -508,7 +496,7 @@ public class QueueEntryImpl implements Q
{
QueueEntryImpl next = nextNode();
- while(next != null && next.isDeleted())
+ while(next != null && next.isDispensed() )
{
final QueueEntryImpl newNext = next.nextNode();
@@ -556,4 +544,14 @@ public class QueueEntryImpl implements Q
return _queueEntryList;
}
+ public boolean isDequeued()
+ {
+ return _state == DEQUEUED_STATE;
+ }
+
+ public boolean isDispensed()
+ {
+ return _state.isDispensed();
+ }
+
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Oct 20 18:42:46 2011
@@ -44,7 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -83,7 +83,7 @@ public class SimpleAMQQueue implements A
/** null means shared */
private final AMQShortString _owner;
- private PrincipalHolder _prinicpalHolder;
+ private AuthorizationHolder _authorizationHolder;
private boolean _exclusive = false;
private AMQSessionModel _exclusiveOwner;
@@ -102,9 +102,7 @@ public class SimpleAMQQueue implements A
protected final QueueEntryList _entries;
- protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-
- private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+ protected final SubscriptionList _subscriptionList = new SubscriptionList();
private volatile Subscription _exclusiveSubscriber;
@@ -373,14 +371,14 @@ public class SimpleAMQQueue implements A
return _owner;
}
- public PrincipalHolder getPrincipalHolder()
+ public AuthorizationHolder getAuthorizationHolder()
{
- return _prinicpalHolder;
+ return _authorizationHolder;
}
- public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+ public void setAuthorizationHolder(final AuthorizationHolder authorizationHolder)
{
- _prinicpalHolder = prinicpalHolder;
+ _authorizationHolder = authorizationHolder;
}
@@ -602,25 +600,25 @@ public class SimpleAMQQueue implements A
iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
- SubscriptionList.SubscriptionNode nextNode = node.getNext();
+ SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
+ SubscriptionList.SubscriptionNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().getNext();
+ nextNode = _subscriptionList.getHead().findNext();
}
while (nextNode != null)
{
- if (_lastSubscriptionNode.compareAndSet(node, nextNode))
+ if (_subscriptionList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _lastSubscriptionNode.get();
- nextNode = node.getNext();
+ node = _subscriptionList.getMarkedNode();
+ nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().getNext();
+ nextNode = _subscriptionList.getHead().findNext();
}
}
}
@@ -629,7 +627,7 @@ public class SimpleAMQQueue implements A
// this catches the case where we *just* miss an update
int loops = 2;
- while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ while (entry.isAvailable() && loops != 0)
{
if (nextNode == null)
{
@@ -642,13 +640,13 @@ public class SimpleAMQQueue implements A
Subscription sub = nextNode.getSubscription();
deliverToSubscription(sub, entry);
}
- nextNode = nextNode.getNext();
+ nextNode = nextNode.findNext();
}
}
- if (!(entry.isAcquired() || entry.isDeleted()))
+ if (entry.isAvailable())
{
checkSubscriptionsNotAheadOfDelivery(entry);
@@ -805,24 +803,6 @@ public class SimpleAMQQueue implements A
}
- public void requeue(QueueEntryImpl entry, Subscription subscription)
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
-
- deliverAsync();
- }
-
public void dequeue(QueueEntry entry, Subscription sub)
{
decrementQueueCount();
@@ -960,7 +940,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (node != null && !node.isDeleted())
+ if (node != null && !node.isDispensed())
{
entryList.add(node);
}
@@ -1064,7 +1044,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && filter.accept(node))
+ if (!node.isDispensed() && filter.accept(node))
{
entryList.add(node);
}
@@ -1258,7 +1238,6 @@ public class SimpleAMQQueue implements A
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
- && !node.isDeleted()
&& node.acquire())
{
dequeueEntry(node);
@@ -1288,7 +1267,7 @@ public class SimpleAMQQueue implements A
while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node);
noDeletes = false;
@@ -1318,7 +1297,7 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if (!node.isDeleted() && node.acquire())
+ if (node.acquire())
{
dequeueEntry(node, txn);
if(++count == request)
@@ -1585,7 +1564,7 @@ public class SimpleAMQQueue implements A
public void deliverAsync()
{
- Runner runner = new Runner(_stateChangeCount.incrementAndGet());
+ QueueRunner runner = new QueueRunner(this, _stateChangeCount.incrementAndGet());
if (_asynchronousRunner.compareAndSet(null, runner))
{
@@ -1604,52 +1583,6 @@ public class SimpleAMQQueue implements A
_asyncDelivery.execute(flusher);
}
-
- private class Runner implements ReadWriteRunnable
- {
- String _name;
- public Runner(long count)
- {
- _name = "QueueRunner-" + count + "-" + _logActor;
- }
-
- public void run()
- {
- String originalName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(_name);
- CurrentActor.set(_logActor);
-
- processQueue(this);
- }
- catch (AMQException e)
- {
- _logger.error(e);
- }
- finally
- {
- CurrentActor.remove();
- Thread.currentThread().setName(originalName);
- }
- }
-
- public boolean isRead()
- {
- return false;
- }
-
- public boolean isWrite()
- {
- return true;
- }
-
- public String toString()
- {
- return _name;
- }
- }
-
public void flushSubscription(Subscription sub) throws AMQException
{
// Access control
@@ -1718,7 +1651,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = getNextAvailableEntry(sub);
- if (node != null && !(node.isAcquired() || node.isDeleted()))
+ if (node != null && node.isAvailable())
{
if (sub.hasInterest(node))
{
@@ -1779,7 +1712,7 @@ public class SimpleAMQQueue implements A
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
{
if (expired)
{
@@ -1808,14 +1741,40 @@ public class SimpleAMQQueue implements A
}
- private void processQueue(Runnable runner) throws AMQException
+ /**
+ * Used by queue Runners to asynchronously deliver messages to consumers.
+ *
+ * A queue Runner is started whenever a state change occurs, e.g when a new
+ * message arrives on the queue and cannot be immediately delivered to a
+ * subscription (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * capable of accepting/delivering all messages then these messages would
+ * otherwise remain on the queue.
+ *
+ * processQueue should be running while there are messages on the queue AND
+ * there are subscriptions that can deliver them. If there are no
+ * subscriptions capable of delivering the remaining messages on the queue
+ * then processQueue should stop to prevent spinning.
+ *
+ * Since processQueue is runs in a fixed size Executor, it should not run
+ * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+ * incoming messages may not be able to be scheduled in the thread pool
+ * because all threads are working on clearing down large queues). To solve
+ * this problem, after an arbitrary number of message deliveries the
+ * processQueue job stops iterating, resubmits itself to the executor, and
+ * ends the current instance
+ *
+ * @param runner the Runner to schedule
+ * @throws AMQException
+ */
+ public void processQueue(QueueRunner runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- int extraLoops = 1;
- long iterations = MAX_ASYNC_DELIVERIES;
+ boolean lastLoop = false;
+ int iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1832,12 +1791,14 @@ public class SimpleAMQQueue implements A
if (previousStateChangeCount != stateChangeCount)
{
- extraLoops = 1;
+ //further asynchronous delivery is required since the
+ //previous loop. keep going if iteration slicing allows.
+ lastLoop = false;
}
previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done;
+ boolean allSubscriptionsDone = true;
+ boolean subscriptionDone;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1847,30 +1808,25 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
try
{
-
- done = attemptDelivery(sub);
-
- if (done)
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub);
+ if (subscriptionDone)
{
- if (extraLoops == 0)
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && sub.isAutoClose())
{
- deliveryIncomplete = false;
- if (sub.isAutoClose())
- {
- unregisterSubscription(sub);
+ unregisterSubscription(sub);
- sub.confirmAutoClose();
- }
- }
- else
- {
- extraLoops--;
+ sub.confirmAutoClose();
}
}
else
{
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
iterations--;
- extraLoops = 1;
}
}
finally
@@ -1878,10 +1834,34 @@ public class SimpleAMQQueue implements A
sub.releaseSendLock();
}
}
+
+ if(allSubscriptionsDone && lastLoop)
+ {
+ //We have done an extra loop already and there are again
+ //again no further delivery attempts possible, only
+ //keep going if state change demands it.
+ deliveryIncomplete = false;
+ }
+ else if(allSubscriptionsDone)
+ {
+ //All subscriptions reported being done, but we have to do
+ //an extra loop if the iterations are not exhausted and
+ //there is still any work to be done
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ lastLoop = true;
+ }
+ else
+ {
+ //some subscriptions can still accept more messages,
+ //keep going if iteration count allows.
+ lastLoop = false;
+ deliveryIncomplete = true;
+ }
+
_asynchronousRunner.set(null);
}
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+ // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
@@ -1901,8 +1881,8 @@ public class SimpleAMQQueue implements A
while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- // Only process nodes that are not currently deleted
- if (!node.isDeleted())
+ // Only process nodes that are not currently deleted and not dequeued
+ if (!node.isDispensed())
{
// If the node has exired then aquire it
if (node.expired() && node.acquire())
@@ -2242,4 +2222,9 @@ public class SimpleAMQQueue implements A
}
}
}
+
+ public LogActor getLogActor()
+ {
+ return _logActor;
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Thu Oct 20 18:42:46 2011
@@ -1,6 +1,5 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -156,7 +155,7 @@ public class SimpleQueueEntryList implem
if(!atTail())
{
QueueEntryImpl nextNode = _lastNode.nextNode();
- while(nextNode.isDeleted() && nextNode.nextNode() != null)
+ while(nextNode.isDispensed() && nextNode.nextNode() != null)
{
nextNode = nextNode.nextNode();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -21,9 +21,14 @@
package org.apache.qpid.server.registry;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -41,23 +46,27 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
import org.apache.qpid.server.logging.Log4jMessageLogger;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.AbstractRootMessageLogger;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
+import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.osgi.framework.BundleContext;
+
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -69,12 +78,10 @@ public abstract class ApplicationRegistr
{
protected static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static Map<Integer, IApplicationRegistry> _instanceMap = new HashMap<Integer, IApplicationRegistry>();
+ private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
protected final ServerConfiguration _configuration;
- public static final int DEFAULT_INSTANCE = 1;
-
protected final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
protected ManagedObjectRegistry _managedObjectRegistry;
@@ -85,8 +92,6 @@ public abstract class ApplicationRegistr
protected SecurityManager _securityManager;
- protected PrincipalDatabaseManager _databaseManager;
-
protected PluginManager _pluginManager;
protected ConfigurationManager _configurationManager;
@@ -102,8 +107,12 @@ public abstract class ApplicationRegistr
private BrokerConfig _broker;
private ConfigStore _configStore;
+
+ private Timer _reportingTimer;
+ private boolean _statisticsEnabled = false;
+ private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- protected String _registryName;
+ private BundleContext _bundleContext;
static
{
@@ -114,53 +123,54 @@ public abstract class ApplicationRegistr
{
public void run()
{
- removeAll();
+ remove();
}
}
public static void initialise(IApplicationRegistry instance) throws Exception
{
- initialise(instance, DEFAULT_INSTANCE);
- }
+ if(instance == null)
+ {
+ throw new IllegalArgumentException("ApplicationRegistry instance must not be null");
+ }
- @SuppressWarnings("finally")
- public static void initialise(IApplicationRegistry instance, int instanceID) throws Exception
- {
- if (instance != null)
+ if(!_instance.compareAndSet(null, instance))
{
- _logger.info("Initialising Application Registry(" + instance + "):" + instanceID);
- _instanceMap.put(instanceID, instance);
+ throw new IllegalStateException("An ApplicationRegistry is already initialised");
+ }
+
+ _logger.info("Initialising Application Registry(" + instance + ")");
+
+
+ final ConfigStore store = ConfigStore.newInstance();
+ store.setRoot(new SystemConfigImpl(store));
+ instance.setConfigStore(store);
- final ConfigStore store = ConfigStore.newInstance();
- store.setRoot(new SystemConfigImpl(store));
- instance.setConfigStore(store);
+ BrokerConfig broker = new BrokerConfigAdapter(instance);
- BrokerConfig broker = new BrokerConfigAdapter(instance);
+ SystemConfig system = (SystemConfig) store.getRoot();
+ system.addBroker(broker);
+ instance.setBroker(broker);
- SystemConfig system = (SystemConfig) store.getRoot();
- system.addBroker(broker);
- instance.setBroker(broker);
+ try
+ {
+ instance.initialise();
+ }
+ catch (Exception e)
+ {
+ _instance.set(null);
+ //remove the Broker instance, then re-throw
try
{
- instance.initialise(instanceID);
+ system.removeBroker(broker);
}
- catch (Exception e)
+ catch(Throwable t)
{
- _instanceMap.remove(instanceID);
- try
- {
- system.removeBroker(broker);
- }
- finally
- {
- throw e;
- }
+ //ignore
}
- }
- else
- {
- remove(instanceID);
+
+ throw e;
}
}
@@ -176,35 +186,19 @@ public abstract class ApplicationRegistr
public static boolean isConfigured()
{
- return isConfigured(DEFAULT_INSTANCE);
- }
-
- public static boolean isConfigured(int instanceID)
- {
- return _instanceMap.containsKey(instanceID);
+ return _instance.get() != null;
}
- /** Method to cleanly shutdown the default registry running in this JVM */
public static void remove()
{
- remove(DEFAULT_INSTANCE);
- }
-
- /**
- * Method to cleanly shutdown specified registry running in this JVM
- *
- * @param instanceID the instance to shutdown
- */
- public static void remove(int instanceID)
- {
+ IApplicationRegistry instance = _instance.getAndSet(null);
try
{
- IApplicationRegistry instance = _instanceMap.get(instanceID);
if (instance != null)
{
if (_logger.isInfoEnabled())
{
- _logger.info("Shutting down ApplicationRegistry(" + instanceID + "):" + instance);
+ _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
}
instance.close();
instance.getBroker().getSystem().removeBroker(instance.getBroker());
@@ -212,27 +206,19 @@ public abstract class ApplicationRegistr
}
catch (Exception e)
{
- _logger.error("Error shutting down Application Registry(" + instanceID + "): " + e, e);
- }
- finally
- {
- _instanceMap.remove(instanceID);
+ _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e);
}
}
- /** Method to cleanly shutdown all registries currently running in this JVM */
- public static void removeAll()
+ protected ApplicationRegistry(ServerConfiguration configuration)
{
- Object[] keys = _instanceMap.keySet().toArray();
- for (Object k : keys)
- {
- remove((Integer) k);
- }
+ this(configuration, null);
}
- protected ApplicationRegistry(ServerConfiguration configuration)
+ protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
{
_configuration = configuration;
+ _bundleContext = bundleContext;
}
public void configure() throws ConfigurationException
@@ -241,7 +227,7 @@ public abstract class ApplicationRegistr
try
{
- _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory());
+ _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
}
catch (Exception e)
{
@@ -251,11 +237,10 @@ public abstract class ApplicationRegistr
_configuration.initialise();
}
- public void initialise(int instanceID) throws Exception
+ public void initialise() throws Exception
{
//Create the RootLogger to be used during broker operation
_rootMessageLogger = new Log4jMessageLogger(_configuration);
- _registryName = String.valueOf(instanceID);
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
@@ -277,11 +262,7 @@ public abstract class ApplicationRegistr
_securityManager = new SecurityManager(_configuration, _pluginManager);
- createDatabaseManager(_configuration);
-
- _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
-
- _databaseManager.initialiseManagement(_configuration);
+ _authenticationManager = createAuthenticationManager();
_managedObjectRegistry.start();
}
@@ -294,6 +275,8 @@ public abstract class ApplicationRegistr
try
{
initialiseVirtualHosts();
+ initialiseStatistics();
+ initialiseStatisticsReporting();
}
finally
{
@@ -302,9 +285,51 @@ public abstract class ApplicationRegistr
}
}
- protected void createDatabaseManager(ServerConfiguration configuration) throws Exception
+ /**
+ * Iterates across all discovered authentication manager factories, offering the security configuration to each.
+ * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
+ *
+ * It is an error to configure more than one authentication manager, or to configure none.
+ *
+ * @return authentication manager
+ * @throws ConfigurationException
+ */
+ protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
{
- _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
+ final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
+ final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
+
+ if (factories.size() == 0)
+ {
+ throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" +
+ "manager plugin has been placed in the plugins directory.");
+ }
+
+ AuthenticationManager authMgr = null;
+
+ for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
+ {
+ final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
+ final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
+ if (tmp != null)
+ {
+ if (authMgr != null)
+ {
+ throw new ConfigurationException("Cannot configure more than one authentication manager."
+ + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
+ + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
+ + " from the classpath.");
+ }
+ authMgr = tmp;
+ }
+ }
+
+ if (authMgr == null)
+ {
+ throw new ConfigurationException("No authentication managers configured within the configure file.");
+ }
+
+ return authMgr;
}
protected void initialiseVirtualHosts() throws Exception
@@ -320,26 +345,88 @@ public abstract class ApplicationRegistr
{
_managedObjectRegistry = new NoopManagedObjectRegistry();
}
-
- public static IApplicationRegistry getInstance()
+
+ public void initialiseStatisticsReporting()
{
- return getInstance(DEFAULT_INSTANCE);
+ long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
+ final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
+ final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
+ final boolean reset = _configuration.isStatisticsReportResetEnabled();
+
+ /* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
+ if (report > 0L && (broker || virtualhost))
+ {
+ _reportingTimer = new Timer("Statistics-Reporting", true);
+
+ class StatisticsReportingTask extends TimerTask
+ {
+ private final int DELIVERED = 0;
+ private final int RECEIVED = 1;
+
+ public void run()
+ {
+ CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ public String getLogMessage()
+ {
+ return "[" + Thread.currentThread().getName() + "] ";
+ }
+ });
+
+ if (broker)
+ {
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
+ CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
+ }
+
+ if (virtualhost)
+ {
+ for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+ {
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ }
+ }
+
+ if (reset)
+ {
+ resetStatistics();
+ }
+
+ CurrentActor.remove();
+ }
+ }
+
+ _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(),
+ report / 2,
+ report);
+ }
}
- public static IApplicationRegistry getInstance(int instanceID)
+ /**
+ * Get the ApplicationRegistry
+ * @return the IApplicationRegistry instance
+ * @throws IllegalStateException if no registry instance has been initialised.
+ */
+ public static IApplicationRegistry getInstance() throws IllegalStateException
{
- synchronized (IApplicationRegistry.class)
+ IApplicationRegistry iApplicationRegistry = _instance.get();
+ if (iApplicationRegistry == null)
{
- IApplicationRegistry instance = _instanceMap.get(instanceID);
-
- if (instance == null)
- {
- throw new IllegalStateException("Application Registry (" + instanceID + ") not created");
- }
- else
- {
- return instance;
- }
+ throw new IllegalStateException("No ApplicationRegistry has been initialised");
+ }
+ else
+ {
+ return iApplicationRegistry;
}
}
@@ -369,6 +456,12 @@ public abstract class ApplicationRegistr
{
_logger.info("Shutting down ApplicationRegistry:" + this);
}
+
+ //Stop Statistics Reporting
+ if (_reportingTimer != null)
+ {
+ _reportingTimer.cancel();
+ }
//Stop incoming connections
unbind();
@@ -376,10 +469,6 @@ public abstract class ApplicationRegistr
//Shutdown virtualhosts
close(_virtualHostRegistry);
-// close(_accessManager);
-//
-// close(_databaseManager);
-
close(_authenticationManager);
close(_managedObjectRegistry);
@@ -401,7 +490,7 @@ public abstract class ApplicationRegistr
try
{
- acceptor.getNetworkDriver().close();
+ acceptor.getNetworkTransport().close();
}
catch (Throwable e)
{
@@ -441,11 +530,6 @@ public abstract class ApplicationRegistr
return _managedObjectRegistry;
}
- public PrincipalDatabaseManager getDatabaseManager()
- {
- return _databaseManager;
- }
-
public AuthenticationManager getAuthenticationManager()
{
return _authenticationManager;
@@ -493,9 +577,81 @@ public abstract class ApplicationRegistr
public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
{
- VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
+ VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
_virtualHostRegistry.registerVirtualHost(virtualHost);
getBroker().addVirtualHost(virtualHost);
return virtualHost;
}
+
+ public void registerMessageDelivered(long messageSize)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
+ }
+ }
+
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ if (isStatisticsEnabled())
+ {
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
+ }
+ }
+
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messagesReceived;
+ }
+
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceived;
+ }
+
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messagesDelivered;
+ }
+
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDelivered;
+ }
+
+ public void resetStatistics()
+ {
+ _messagesDelivered.reset();
+ _dataDelivered.reset();
+ _messagesReceived.reset();
+ _dataReceived.reset();
+
+ for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
+ {
+ vhost.resetStatistics();
+ }
+ }
+
+ public void initialiseStatistics()
+ {
+ setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
+ getConfiguration().isStatisticsGenerationBrokerEnabled());
+
+ _messagesDelivered = new StatisticsCounter("messages-delivered");
+ _dataDelivered = new StatisticsCounter("bytes-delivered");
+ _messagesReceived = new StatisticsCounter("messages-received");
+ _dataReceived = new StatisticsCounter("bytes-received");
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return _statisticsEnabled;
+ }
+
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ _statisticsEnabled = enabled;
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Thu Oct 20 18:42:46 2011
@@ -71,7 +71,7 @@ public class BrokerConfigAdapter impleme
public Integer getWorkerThreads()
{
- return _instance.getConfiguration().getProcessors();
+ return _instance.getConfiguration().getConnectorProcessors();
}
public Integer getMaxConnections()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -29,12 +29,18 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.management.JMXManagedObjectRegistry;
import org.apache.qpid.server.management.NoopManagedObjectRegistry;
+import org.osgi.framework.BundleContext;
public class ConfigurationFileApplicationRegistry extends ApplicationRegistry
{
public ConfigurationFileApplicationRegistry(File configurationURL) throws ConfigurationException
{
- super(new ServerConfiguration(configurationURL));
+ this(configurationURL, null);
+ }
+
+ public ConfigurationFileApplicationRegistry(File configurationURL, BundleContext bundleContext) throws ConfigurationException
+ {
+ super(new ServerConfiguration(configurationURL), bundleContext);
}
@Override
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Thu Oct 20 18:42:46 2011
@@ -33,21 +33,20 @@ import org.apache.qpid.server.logging.Ro
import org.apache.qpid.server.management.ManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-public interface IApplicationRegistry
+public interface IApplicationRegistry extends StatisticsGatherer
{
/**
* Initialise the application registry. All initialisation must be done in this method so that any components
* that need access to the application registry itself for initialisation are able to use it. Attempting to
* initialise in the constructor will lead to failures since the registry reference will not have been set.
- * @param instanceID the instanceID that we can use to identify this AR.
*/
- void initialise(int instanceID) throws Exception;
+ void initialise() throws Exception;
/**
* Shutdown this Registry
@@ -63,8 +62,6 @@ public interface IApplicationRegistry
ManagedObjectRegistry getManagedObjectRegistry();
- PrincipalDatabaseManager getDatabaseManager();
-
AuthenticationManager getAuthenticationManager();
VirtualHostRegistry getVirtualHostRegistry();
@@ -97,4 +94,6 @@ public interface IApplicationRegistry
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
+
+ void initialiseStatisticsReporting();
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java Thu Oct 20 18:42:46 2011
@@ -18,8 +18,19 @@
*/
package org.apache.qpid.server.security;
-import static org.apache.qpid.server.security.access.ObjectType.*;
-import static org.apache.qpid.server.security.access.Operation.*;
+import static org.apache.qpid.server.security.access.ObjectType.EXCHANGE;
+import static org.apache.qpid.server.security.access.ObjectType.METHOD;
+import static org.apache.qpid.server.security.access.ObjectType.OBJECT;
+import static org.apache.qpid.server.security.access.ObjectType.QUEUE;
+import static org.apache.qpid.server.security.access.ObjectType.VIRTUALHOST;
+import static org.apache.qpid.server.security.access.Operation.ACCESS;
+import static org.apache.qpid.server.security.access.Operation.BIND;
+import static org.apache.qpid.server.security.access.Operation.CONSUME;
+import static org.apache.qpid.server.security.access.Operation.CREATE;
+import static org.apache.qpid.server.security.access.Operation.DELETE;
+import static org.apache.qpid.server.security.access.Operation.PUBLISH;
+import static org.apache.qpid.server.security.access.Operation.PURGE;
+import static org.apache.qpid.server.security.access.Operation.UNBIND;
import java.net.SocketAddress;
import java.security.Principal;
@@ -29,6 +40,8 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import javax.security.auth.Subject;
+
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -37,11 +50,9 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.access.ObjectProperties;
import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
/**
* The security manager contains references to all loaded {@link SecurityPlugin}s and delegates security decisions to them based
@@ -55,7 +66,7 @@ public class SecurityManager
private static final Logger _logger = Logger.getLogger(SecurityManager.class);
/** Container for the {@link Principal} that is using to this thread. */
- private static final ThreadLocal<Principal> _principal = new ThreadLocal<Principal>();
+ private static final ThreadLocal<Subject> _subject = new ThreadLocal<Subject>();
private PluginManager _pluginManager;
private Map<String, SecurityPluginFactory> _pluginFactories = new HashMap<String, SecurityPluginFactory>();
@@ -126,19 +137,14 @@ public class SecurityManager
configureHostPlugins(configuration);
}
- public static Principal getThreadPrincipal()
- {
- return _principal.get();
- }
-
- public static void setThreadPrincipal(Principal principal)
+ public static Subject getThreadSubject()
{
- _principal.set(principal);
+ return _subject.get();
}
- public static void setThreadPrincipal(String authId)
+ public static void setThreadSubject(final Subject subject)
{
- setThreadPrincipal(new UsernamePrincipal(authId));
+ _subject.set(subject);
}
public void configureHostPlugins(ConfigurationPlugin hostConfig) throws ConfigurationException
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java Thu Oct 20 18:42:46 2011
@@ -149,9 +149,9 @@ public class ObjectProperties extends Ha
{
put(Property.OWNER, queue.getOwner());
}
- else if (queue.getPrincipalHolder() != null)
+ else if (queue.getAuthorizationHolder() != null)
{
- put(Property.OWNER, queue.getPrincipalHolder().getPrincipal().getName());
+ put(Property.OWNER, queue.getAuthorizationHolder().getAuthorizedPrincipal().getName());
}
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/security/auth/AuthenticationResult.java Thu Oct 20 18:42:46 2011
@@ -20,42 +20,93 @@
*/
package org.apache.qpid.server.security.auth;
+import javax.security.auth.Subject;
+
+/**
+ * Encapsulates the result of an attempt to authenticate.
+ * <p>
+ * The authentication status describes the overall outcome.
+ * <p>
+ * <ol>
+ * <li>If authentication status is SUCCESS, the subject will be populated.
+ * </li>
+ * <li>If authentication status is CONTINUE, the authentication has failed because the user
+ * supplied incorrect credentials (etc). If the authentication requires it, the next challenge
+ * is made available.
+ * </li>
+ * <li>If authentication status is ERROR , the authentication decision could not be made due
+ * to a failure (such as an external system), the {@link AuthenticationResult#getCause()}
+ * will provide the underlying exception.
+ * </li>
+ * </ol>
+ *
+ */
public class AuthenticationResult
{
public enum AuthenticationStatus
{
- SUCCESS, CONTINUE, ERROR
+ /** Authentication successful */
+ SUCCESS,
+ /** Authentication not successful due to credentials problem etc */
+ CONTINUE,
+ /** Problem prevented the authentication from being made e.g. failure of an external system */
+ ERROR
}
- public AuthenticationStatus status;
- public byte[] challenge;
-
- private Exception cause;
+ public final AuthenticationStatus _status;
+ public final byte[] _challenge;
+ private final Exception _cause;
+ private final Subject _subject;
- public AuthenticationResult(AuthenticationStatus status)
+ public AuthenticationResult(final AuthenticationStatus status)
{
this(null, status, null);
}
- public AuthenticationResult(byte[] challenge, AuthenticationStatus status)
+ public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status)
{
this(challenge, status, null);
}
- public AuthenticationResult(AuthenticationStatus error, Exception cause)
+ public AuthenticationResult(final AuthenticationStatus error, final Exception cause)
{
this(null, error, cause);
}
- public AuthenticationResult(byte[] challenge, AuthenticationStatus status, Exception cause)
+ public AuthenticationResult(final byte[] challenge, final AuthenticationStatus status, final Exception cause)
+ {
+ this._status = status;
+ this._challenge = challenge;
+ this._cause = cause;
+ this._subject = null;
+ }
+
+ public AuthenticationResult(final Subject subject)
{
- this.status = status;
- this.challenge = challenge;
- this.cause = cause;
+ this._status = AuthenticationStatus.SUCCESS;
+ this._challenge = null;
+ this._cause = null;
+ this._subject = subject;
}
public Exception getCause()
{
- return cause;
+ return _cause;
+ }
+
+ public AuthenticationStatus getStatus()
+ {
+ return _status;
}
+
+ public byte[] getChallenge()
+ {
+ return _challenge;
+ }
+
+ public Subject getSubject()
+ {
+ return _subject;
+ }
+
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org