You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [16/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
@@ -27,13 +28,16 @@ public enum NotificationCheck
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
{
- listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.");
+ String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
return true;
}
return false;
@@ -41,7 +45,7 @@ public enum NotificationCheck
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
@@ -50,10 +54,12 @@ public enum NotificationCheck
long messageSize;
messageSize = (msg == null) ? 0 : msg.getSize();
-
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
+ String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
return true;
}
}
@@ -63,7 +69,7 @@ public enum NotificationCheck
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -74,7 +80,10 @@ public enum NotificationCheck
if (queueDepth >= maximumQueueDepth)
{
- listener.notifyClients(this, queue, (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.");
+ String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
return true;
}
}
@@ -84,7 +93,7 @@ public enum NotificationCheck
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -97,7 +106,10 @@ public enum NotificationCheck
if(firstArrivalTime < thresholdTime)
{
long oldestAge = currentTime - firstArrivalTime;
- listener.notifyClients(this, queue, (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.");
+ String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
return true;
}
@@ -109,6 +121,8 @@ public enum NotificationCheck
}
;
+ private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
+
private final boolean _messageSpecific;
NotificationCheck()
@@ -126,6 +140,11 @@ public enum NotificationCheck
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
+ //A bit of a hack, only for use until we do the logging listener
+ private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
+ {
+ LOGGER.info(notification.name() + " On Queue " + queue.getNameShortString() + " - " + notificationMsg);
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri Aug 3 12:13:32 2012
@@ -37,7 +37,7 @@ public class PriorityQueueList implement
_priorityOffset = 5-((priorities + 1)/2);
for(int i = 0; i < priorities; i++)
{
- _priorityLists[i] = new PriorityQueueEntrySubList(queue);
+ _priorityLists[i] = new PriorityQueueEntrySubList(queue, i);
}
}
@@ -164,9 +164,12 @@ public class PriorityQueueList implement
private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
{
- public PriorityQueueEntrySubList(AMQQueue queue)
+ private int _listPriority;
+
+ public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
{
super(queue);
+ _listPriority = listPriority;
}
@Override
@@ -174,6 +177,11 @@ public class PriorityQueueList implement
{
return new PriorityQueueEntryImpl(this, message);
}
+
+ public int getListPriority()
+ {
+ return _listPriority;
+ }
}
private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
@@ -186,8 +194,9 @@ public class PriorityQueueList implement
@Override
public int compareTo(final QueueEntry o)
{
- byte thisPriority = getMessageHeader().getPriority();
- byte otherPriority = o.getMessageHeader().getPriority();
+ PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)((PriorityQueueEntryImpl)o).getQueueEntryList();
+ int otherPriority = pqel.getListPriority();
+ int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
if(thisPriority != otherPriority)
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Aug 3 12:13:32 2012
@@ -1,9 +1,3 @@
-package org.apache.qpid.server.queue;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
-
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -24,6 +18,12 @@ import org.apache.qpid.server.subscripti
* under the License.
*
*/
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.subscription.Subscription;
+
public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Aug 3 12:13:32 2012
@@ -233,7 +233,7 @@ public abstract class QueueEntryImpl imp
if(state instanceof SubscriptionAcquiredState)
{
- getQueue().decrementUnackedMsgCount();
+ getQueue().decrementUnackedMsgCount(this);
Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
if (subscription != null)
{
@@ -369,7 +369,7 @@ public abstract class QueueEntryImpl imp
Subscription s = null;
if (state instanceof SubscriptionAcquiredState)
{
- getQueue().decrementUnackedMsgCount();
+ getQueue().decrementUnackedMsgCount(this);
s = ((SubscriptionAcquiredState) state).getSubscription();
s.onDequeue(this);
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Fri Aug 3 12:13:32 2012
@@ -1,21 +1,21 @@
-package org.apache.qpid.server.queue;
-
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
-* <p/>
+*
* http://www.apache.org/licenses/LICENSE-2.0
-* <p/>
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.qpid.server.queue;
+
public interface QueueEntryVisitor
{
boolean visit(QueueEntry entry);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri Aug 3 12:13:32 2012
@@ -42,7 +42,15 @@ public interface QueueRegistry
AMQQueue getQueue(String queue);
+ void addRegistryChangeListener(RegistryChangeListener listener);
+
void stopAllAndUnregisterMBeans();
AMQQueue getQueue(UUID queueId);
+
+ interface RegistryChangeListener
+ {
+ void queueRegistered(AMQQueue queue);
+ void queueUnregistered(AMQQueue queue);
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Aug 3 12:13:32 2012
@@ -19,8 +19,10 @@
package org.apache.qpid.server.queue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,12 +30,11 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.management.JMException;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
@@ -52,7 +53,6 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -70,6 +70,7 @@ import org.apache.qpid.server.virtualhos
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+
private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
@@ -77,11 +78,9 @@ public class SimpleAMQQueue implements A
// TODO - should make this configurable at the vhost / broker level
private static final int DEFAULT_MAX_GROUPS = 255;
-
private final VirtualHost _virtualHost;
private final AMQShortString _name;
- private final String _resourceName;
/** null means shared */
private final AMQShortString _owner;
@@ -118,6 +117,7 @@ public class SimpleAMQQueue implements A
private final AtomicLong _dequeueCount = new AtomicLong();
private final AtomicLong _dequeueSize = new AtomicLong();
+ private final AtomicLong _enqueueCount = new AtomicLong();
private final AtomicLong _enqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
@@ -130,6 +130,7 @@ public class SimpleAMQQueue implements A
private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
+ private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -173,7 +174,6 @@ public class SimpleAMQQueue implements A
private LogSubject _logSubject;
private LogActor _logActor;
- private AMQQueueMBean _managedObject;
private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
private boolean _nolocal;
@@ -185,12 +185,19 @@ public class SimpleAMQQueue implements A
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
+ private UUID _qmfId;
private ConfigurationPlugin _queueConfiguration;
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
private final MessageGroupManager _messageGroupManager;
+ private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
+ new ArrayList<SubscriptionRegistrationListener>();
+
+ private AMQQueue.NotificationListener _notificationListener;
+ private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
+
protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
@@ -227,17 +234,16 @@ public class SimpleAMQQueue implements A
}
_name = name;
- _resourceName = String.valueOf(name);
_durable = durable;
_owner = owner;
_autoDelete = autoDelete;
_exclusive = exclusive;
_virtualHost = virtualHost;
_entries = entryListFactory.createQueueEntryList(this);
- _arguments = arguments;
+ _arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
_id = id;
-
+ _qmfId = getConfigStore().createId();
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -255,16 +261,6 @@ public class SimpleAMQQueue implements A
getConfigStore().addConfiguredObject(this);
- try
- {
- _managedObject = new AMQQueueMBean(this);
- _managedObject.register();
- }
- catch (JMException e)
- {
- _logger.error("AMQQueue MBean creation has failed ", e);
- }
-
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
@@ -302,7 +298,22 @@ public class SimpleAMQQueue implements A
public void execute(Runnable runnable)
{
- _asyncDelivery.execute(runnable);
+ try
+ {
+ _asyncDelivery.execute(runnable);
+ }
+ catch (RejectedExecutionException ree)
+ {
+ if (_stopped.get())
+ {
+ // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped.
+ }
+ else
+ {
+ _logger.error("Unexpected rejected execution", ree);
+ throw ree;
+ }
+ }
}
public AMQShortString getNameShortString()
@@ -320,6 +331,12 @@ public class SimpleAMQQueue implements A
return _id;
}
+ @Override
+ public UUID getQMFId()
+ {
+ return _qmfId;
+ }
+
public QueueConfigType getConfigType()
{
return QueueConfigType.getInstance();
@@ -339,15 +356,10 @@ public class SimpleAMQQueue implements A
{
return _exclusive;
}
-
- public void setExclusive(boolean exclusive) throws AMQException
+
+ public void setExclusive(boolean exclusive)
{
_exclusive = exclusive;
-
- if(isDurable())
- {
- getVirtualHost().getMessageStore().updateQueue(this);
- }
}
public Exchange getAlternateExchange()
@@ -368,22 +380,10 @@ public class SimpleAMQQueue implements A
_alternateExchange = exchange;
}
- public void setAlternateExchange(String exchangeName)
- {
- if(exchangeName == null || exchangeName.equals(""))
- {
- _alternateExchange = null;
- return;
- }
-
- Exchange exchange = getVirtualHost().getExchangeRegistry().getExchange(new AMQShortString(exchangeName));
- if (exchange == null)
- {
- throw new RuntimeException("Exchange '" + exchangeName + "' is not registered with the VirtualHost.");
- }
- setAlternateExchange(exchange);
- }
-
+ /**
+ * Arguments used to create this queue. The caller is assured
+ * that null will never be returned.
+ */
public Map<String, Object> getArguments()
{
return _arguments;
@@ -430,8 +430,8 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied");
}
-
-
+
+
if (hasExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
@@ -463,15 +463,24 @@ public class SimpleAMQQueue implements A
{
subscription.setNoLocal(_nolocal);
}
+
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionRegistered(this, subscription);
+ }
+ }
+
_subscriptionList.add(subscription);
-
+
//Increment consumerCountHigh if necessary. (un)registerSubscription are both
//synchronized methods so we don't need additional synchronization here
if(_counsumerCountHigh.get() < getConsumerCount())
{
_counsumerCountHigh.incrementAndGet();
}
-
+
if (isDeleted())
{
subscription.queueDeleted(this);
@@ -507,6 +516,14 @@ public class SimpleAMQQueue implements A
resetSubPointersForGroups(subscription, true);
}
+ synchronized (_subscriptionListeners)
+ {
+ for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ {
+ listener.subscriptionUnregistered(this, subscription);
+ }
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
@@ -526,6 +543,34 @@ public class SimpleAMQQueue implements A
}
+ public Collection<Subscription> getConsumers()
+ {
+ List<Subscription> consumers = new ArrayList<Subscription>();
+ SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ while(iter.advance())
+ {
+ consumers.add(iter.getNode().getSubscription());
+ }
+ return consumers;
+
+ }
+
+ public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.add(listener);
+ }
+ }
+
+ public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ {
+ synchronized (_subscriptionListeners)
+ {
+ _subscriptionListeners.remove(listener);
+ }
+ }
+
public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
{
QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
@@ -576,10 +621,10 @@ public class SimpleAMQQueue implements A
break;
}
}
-
+
reconfigure();
}
-
+
private void reconfigure()
{
//Reconfigure the queue for to reflect this new binding.
@@ -604,7 +649,7 @@ public class SimpleAMQQueue implements A
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
+
reconfigure();
}
@@ -718,10 +763,7 @@ public class SimpleAMQQueue implements A
}
}
- if(_managedObject != null)
- {
- _managedObject.checkForNotification(entry.getMessage());
- }
+ checkForNotification(entry.getMessage());
if(action != null)
{
@@ -738,8 +780,8 @@ public class SimpleAMQQueue implements A
{
try
{
- if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
@@ -788,6 +830,7 @@ public class SimpleAMQQueue implements A
{
long size = message.getSize();
getAtomicQueueSize().addAndGet(size);
+ _enqueueCount.incrementAndGet();
_enqueueSize.addAndGet(size);
if(message.isPersistent() && isDurable())
{
@@ -796,19 +839,29 @@ public class SimpleAMQQueue implements A
}
}
+ public long getTotalDequeueCount()
+ {
+ return _dequeueCount.get();
+ }
+
+ public long getTotalEnqueueCount()
+ {
+ return _enqueueCount.get();
+ }
+
private void incrementQueueCount()
{
getAtomicQueueCount().incrementAndGet();
}
-
+
private void incrementTxnEnqueueStats(final ServerMessage message)
{
_msgTxnEnqueues.incrementAndGet();
_byteTxnEnqueues.addAndGet(message.getSize());
}
-
+
private void incrementTxnDequeueStats(QueueEntry entry)
- {
+ {
_msgTxnDequeues.incrementAndGet();
_byteTxnDequeues.addAndGet(entry.getSize());
}
@@ -819,7 +872,7 @@ public class SimpleAMQQueue implements A
setLastSeenEntry(sub, entry);
_deliveredMessages.incrementAndGet();
- incrementUnackedMsgCount();
+ incrementUnackedMsgCount(entry);
sub.send(entry, batch);
}
@@ -833,12 +886,15 @@ public class SimpleAMQQueue implements A
private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
{
QueueContext subContext = (QueueContext) sub.getQueueContext();
- QueueEntry releasedEntry = subContext.getReleasedEntry();
-
- QueueContext._lastSeenUpdater.set(subContext, entry);
- if(releasedEntry == entry)
+ if (subContext != null)
{
- QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ QueueEntry releasedEntry = subContext.getReleasedEntry();
+
+ QueueContext._lastSeenUpdater.set(subContext, entry);
+ if(releasedEntry == entry)
+ {
+ QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
+ }
}
}
@@ -887,7 +943,7 @@ public class SimpleAMQQueue implements A
{
_deliveredMessages.decrementAndGet();
}
-
+
if(sub != null && sub.isSessionTransactional())
{
incrementTxnDequeueStats(entry);
@@ -940,11 +996,13 @@ public class SimpleAMQQueue implements A
}
}
+
+
public int getConsumerCount()
{
return _subscriptionList.size();
}
-
+
public int getConsumerCountHigh()
{
return _counsumerCountHigh.get();
@@ -1148,7 +1206,7 @@ public class SimpleAMQQueue implements A
}
- public void visit(final Visitor visitor)
+ public void visit(final QueueEntryVisitor visitor)
{
QueueEntryIterator queueListIterator = _entries.iterator();
@@ -1195,192 +1253,6 @@ public class SimpleAMQQueue implements A
}
- public void moveMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String destinationQueueName) throws IllegalArgumentException
- {
-
- final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return (messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && entry.acquire();
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
-
- final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
- boolean shouldRollback = true;
- try
- {
- // Move the messages in on the message store.
- for (final QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
- txn.enqueue(toQueue, message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- try
- {
- toQueue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- entry.release();
- }
- });
- txn.dequeue(this, message,
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.discard();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
- txn.commit();
- shouldRollback = false;
- }
- finally
- {
- if (shouldRollback)
- {
- txn.rollback();
- }
- }
-
- }
-
- public void copyMessagesToAnotherQueue(final long fromMessageId,
- final long toMessageId,
- String destinationQueueName) throws IllegalArgumentException
- {
- final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
-
- List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageNumber();
- return ((messageId >= fromMessageId)
- && (messageId <= toMessageId));
- }
-
- public boolean filterComplete()
- {
- return false;
- }
- });
-
- final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
- boolean shouldRollback = true;
- try
- {
- // Copy the messages in on the message store.
- for (QueueEntry entry : entries)
- {
- final ServerMessage message = entry.getMessage();
-
- txn.enqueue(toQueue, message, new ServerTransaction.Action()
- {
- public void postCommit()
- {
- try
- {
- toQueue.enqueue(message);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- }
- });
-
- }
-
- txn.commit();
- shouldRollback = false;
- }
- finally
- {
- if (shouldRollback)
- {
- txn.rollback();
- }
- }
-
- }
-
- private AMQQueue getValidatedDestinationQueue(String queueName)
- {
- final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
- if (toQueue == null)
- {
- throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
- }
- else if (toQueue == this)
- {
- throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
- }
- return toQueue;
- }
-
- public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
- {
-
- QueueEntryIterator queueListIterator = _entries.iterator();
-
- while (queueListIterator.advance())
- {
- QueueEntry node = queueListIterator.getNode();
-
- final ServerMessage message = node.getMessage();
- if(message != null)
- {
- final long messageId = message.getMessageNumber();
-
- if ((messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && node.acquire())
- {
- dequeueEntry(node);
- }
- }
- }
-
- }
-
public void purge(final long request) throws AMQException
{
clear(request);
@@ -1393,6 +1265,7 @@ public class SimpleAMQQueue implements A
// ------ Management functions
+ // TODO - now only used by the tests
public void deleteMessageFromTop()
{
QueueEntryIterator queueListIterator = _entries.iterator();
@@ -1411,7 +1284,7 @@ public class SimpleAMQQueue implements A
}
public long clearQueue() throws AMQException
- {
+ {
return clear(0l);
}
@@ -1422,7 +1295,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: queue " + getName());
}
-
+
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
@@ -1489,7 +1362,7 @@ public class SimpleAMQQueue implements A
{
throw new AMQSecurityException("Permission denied: " + getName());
}
-
+
if (!_deleted.getAndSet(true))
{
@@ -1617,12 +1490,6 @@ public class SimpleAMQQueue implements A
txn.commit();
-
- if(_managedObject!=null)
- {
- _managedObject.unregister();
- }
-
for (Task task : _deleteTaskList)
{
task.doTask(this);
@@ -2101,16 +1968,13 @@ public class SimpleAMQQueue implements A
}
else
{
- if (_managedObject != null)
+ // There is a chance that the node could be deleted by
+ // the time the check actually occurs. So verify we
+ // can actually get the message to perform the check.
+ ServerMessage msg = node.getMessage();
+ if (msg != null)
{
- // There is a chance that the node could be deleted by
- // the time the check actually occurs. So verify we
- // can actually get the message to perform the check.
- ServerMessage msg = node.getMessage();
- if (msg != null)
- {
- _managedObject.checkForNotification(msg);
- }
+ checkForNotification(msg);
}
}
}
@@ -2235,11 +2099,6 @@ public class SimpleAMQQueue implements A
return _notificationChecks;
}
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
-
private final class QueueEntryListener implements QueueEntry.StateChangeListener
{
@@ -2330,12 +2189,6 @@ public class SimpleAMQQueue implements A
return _queueConfiguration;
}
- public String getResourceName()
- {
- return _resourceName;
- }
-
-
public ConfigStore getConfigStore()
{
return getVirtualHost().getConfigStore();
@@ -2355,22 +2208,22 @@ public class SimpleAMQQueue implements A
{
return _dequeueSize.get();
}
-
+
public long getByteTxnEnqueues()
{
return _byteTxnEnqueues.get();
}
-
+
public long getByteTxnDequeues()
{
return _byteTxnDequeues.get();
}
-
+
public long getMsgTxnEnqueues()
{
return _msgTxnEnqueues.get();
}
-
+
public long getMsgTxnDequeues()
{
return _msgTxnDequeues.get();
@@ -2407,21 +2260,28 @@ public class SimpleAMQQueue implements A
{
return _unackedMsgCountHigh.get();
}
-
+
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
}
-
- public void decrementUnackedMsgCount()
+
+ public long getUnackedMessageBytes()
+ {
+ return _unackedMsgBytes.get();
+ }
+
+ public void decrementUnackedMsgCount(QueueEntry queueEntry)
{
_unackedMsgCount.decrementAndGet();
+ _unackedMsgBytes.addAndGet(-queueEntry.getSize());
}
-
- private void incrementUnackedMsgCount()
+
+ private void incrementUnackedMsgCount(QueueEntry entry)
{
long unackedMsgCount = _unackedMsgCount.incrementAndGet();
-
+ _unackedMsgBytes.addAndGet(entry.getSize());
+
long unackedMsgCountHigh;
while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
{
@@ -2447,4 +2307,54 @@ public class SimpleAMQQueue implements A
_maximumDeliveryCount = maximumDeliveryCount;
}
+ /**
+ * Checks if there is any notification to send to the listeners
+ */
+ private void checkForNotification(ServerMessage<?> msg) throws AMQException
+ {
+ final Set<NotificationCheck> notificationChecks = getNotificationChecks();
+ final AMQQueue.NotificationListener listener = _notificationListener;
+
+ if(listener != null && !notificationChecks.isEmpty())
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - getMinimumAlertRepeatGap();
+
+ for (NotificationCheck check : notificationChecks)
+ {
+ if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
+ {
+ if (check.notifyIfNecessary(msg, this, listener))
+ {
+ _lastNotificationTimes[check.ordinal()] = currentTime;
+ }
+ }
+ }
+ }
+ }
+
+ public void setNotificationListener(AMQQueue.NotificationListener listener)
+ {
+ _notificationListener = listener;
+ }
+
+ @Override
+ public void setDescription(String description)
+ {
+ if (description == null)
+ {
+ _arguments.remove(AMQQueueFactory.X_QPID_DESCRIPTION);
+ }
+ else
+ {
+ _arguments.put(AMQQueueFactory.X_QPID_DESCRIPTION, description);
+ }
+ }
+
+ @Override
+ public String getDescription()
+ {
+ return (String) _arguments.get(AMQQueueFactory.X_QPID_DESCRIPTION);
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.queue;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,6 +18,7 @@ package org.apache.qpid.server.queue;
* under the License.
*
*/
+package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri Aug 3 12:13:32 2012
@@ -22,9 +22,9 @@ package org.apache.qpid.server.registry;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
+import org.apache.qpid.server.logging.*;
import org.osgi.framework.BundleContext;
-import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.qmf.QMFService;
@@ -35,24 +35,18 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.SystemConfig;
import org.apache.qpid.server.configuration.SystemConfigImpl;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
-import org.apache.qpid.server.logging.Log4jMessageLogger;
-import org.apache.qpid.server.logging.LogActor;
-import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
-import org.apache.qpid.server.plugins.Plugin;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.adapter.BrokerAdapter;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.SecurityManager.SecurityConfiguration;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerPluginFactory;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManagerRegistry;
+import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -60,13 +54,8 @@ import org.apache.qpid.server.virtualhos
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
+import java.net.SocketAddress;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
@@ -78,19 +67,19 @@ import java.util.concurrent.atomic.Atomi
*/
public abstract class ApplicationRegistry implements IApplicationRegistry
{
+
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
private final ServerConfiguration _configuration;
- private final Map<InetSocketAddress, QpidAcceptor> _acceptors = new HashMap<InetSocketAddress, QpidAcceptor>();
-
- private ManagedObjectRegistry _managedObjectRegistry;
+ private final Map<InetSocketAddress, QpidAcceptor> _acceptors =
+ Collections.synchronizedMap(new HashMap<InetSocketAddress, QpidAcceptor>());
- private AuthenticationManager _authenticationManager;
+ private IAuthenticationManagerRegistry _authenticationManagerRegistry;
- private VirtualHostRegistry _virtualHostRegistry;
+ private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(this);
private SecurityManager _securityManager;
@@ -106,39 +95,32 @@ public abstract class ApplicationRegistr
private QMFService _qmfService;
- private BrokerConfig _broker;
+ private BrokerConfig _brokerConfig;
+
+ private Broker _broker;
private ConfigStore _configStore;
-
+
private Timer _reportingTimer;
- private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private BundleContext _bundleContext;
- protected static Logger get_logger()
- {
- return _logger;
- }
+ private final List<PortBindingListener> _portBindingListeners = new ArrayList<PortBindingListener>();
- protected Map<InetSocketAddress, QpidAcceptor> getAcceptors()
- {
- return _acceptors;
- }
+ private int _httpManagementPort = -1, _httpsManagementPort = -1;
- protected void setManagedObjectRegistry(ManagedObjectRegistry managedObjectRegistry)
- {
- _managedObjectRegistry = managedObjectRegistry;
- }
+ private LogRecorder _logRecorder;
- protected void setAuthenticationManager(AuthenticationManager authenticationManager)
- {
- _authenticationManager = authenticationManager;
- }
+ private List<IAuthenticationManagerRegistry.RegistryChangeListener> _authManagerChangeListeners =
+ new ArrayList<IAuthenticationManagerRegistry.RegistryChangeListener>();
- protected void setVirtualHostRegistry(VirtualHostRegistry virtualHostRegistry)
+ public Map<InetSocketAddress, QpidAcceptor> getAcceptors()
{
- _virtualHostRegistry = virtualHostRegistry;
+ synchronized (_acceptors)
+ {
+ return new HashMap<InetSocketAddress, QpidAcceptor>(_acceptors);
+ }
}
protected void setSecurityManager(SecurityManager securityManager)
@@ -205,11 +187,11 @@ public abstract class ApplicationRegistr
store.setRoot(new SystemConfigImpl(store));
instance.setConfigStore(store);
- BrokerConfig broker = new BrokerConfigAdapter(instance);
+ final BrokerConfig brokerConfig = new BrokerConfigAdapter(instance);
- SystemConfig system = store.getRoot();
- system.addBroker(broker);
- instance.setBroker(broker);
+ final SystemConfig system = store.getRoot();
+ system.addBroker(brokerConfig);
+ instance.setBrokerConfig(brokerConfig);
try
{
@@ -222,7 +204,7 @@ public abstract class ApplicationRegistr
//remove the Broker instance, then re-throw
try
{
- system.removeBroker(broker);
+ system.removeBroker(brokerConfig);
}
catch(Throwable t)
{
@@ -297,18 +279,32 @@ public abstract class ApplicationRegistr
public void initialise() throws Exception
{
+ _logRecorder = new LogRecorder();
//Create the RootLogger to be used during broker operation
_rootMessageLogger = new Log4jMessageLogger(_configuration);
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
_startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
-
- CurrentActor.set(new BrokerActor(_startupMessageLogger));
+
+ BrokerActor actor = new BrokerActor(_startupMessageLogger);
+ CurrentActor.setDefault(actor);
+ CurrentActor.set(actor);
try
{
- initialiseManagedObjectRegistry();
+ initialiseStatistics();
+
+ if(_configuration.getHTTPManagementEnabled())
+ {
+ _httpManagementPort = _configuration.getHTTPManagementPort();
+ }
+ if (_configuration.getHTTPSManagementEnabled())
+ {
+ _httpsManagementPort = _configuration.getHTTPSManagementPort();
+ }
+
+ _broker = new BrokerAdapter(this);
configure();
@@ -316,13 +312,23 @@ public abstract class ApplicationRegistr
logStartupMessages(CurrentActor.get());
- _virtualHostRegistry = new VirtualHostRegistry(this);
-
_securityManager = new SecurityManager(_configuration, _pluginManager);
- _authenticationManager = createAuthenticationManager();
+ _authenticationManagerRegistry = createAuthenticationManagerRegistry(_configuration, _pluginManager);
+
+ if(!_authManagerChangeListeners.isEmpty())
+ {
+ for(IAuthenticationManagerRegistry.RegistryChangeListener listener : _authManagerChangeListeners)
+ {
- _managedObjectRegistry.start();
+ _authenticationManagerRegistry.addRegistryChangeListener(listener);
+ for(AuthenticationManager authMgr : _authenticationManagerRegistry.getAvailableAuthenticationManagers().values())
+ {
+ listener.authenticationManagerRegistered(authMgr);
+ }
+ }
+ _authManagerChangeListeners.clear();
+ }
}
finally
{
@@ -333,7 +339,6 @@ public abstract class ApplicationRegistr
try
{
initialiseVirtualHosts();
- initialiseStatistics();
initialiseStatisticsReporting();
}
finally
@@ -343,52 +348,10 @@ public abstract class ApplicationRegistr
}
}
-
- /**
- * Iterates across all discovered authentication manager factories, offering the security configuration to each.
- * Expects <b>exactly</b> one authentication manager to configure and initialise itself.
- *
- * It is an error to configure more than one authentication manager, or to configure none.
- *
- * @return authentication manager
- * @throws ConfigurationException
- */
- protected AuthenticationManager createAuthenticationManager() throws ConfigurationException
+ protected IAuthenticationManagerRegistry createAuthenticationManagerRegistry(ServerConfiguration _configuration, PluginManager _pluginManager)
+ throws ConfigurationException
{
- final SecurityConfiguration securityConfiguration = _configuration.getConfiguration(SecurityConfiguration.class.getName());
- final Collection<AuthenticationManagerPluginFactory<? extends Plugin>> factories = _pluginManager.getAuthenticationManagerPlugins().values();
-
- if (factories.size() == 0)
- {
- throw new ConfigurationException("No authentication manager factory plugins found. Check the desired authentication" +
- "manager plugin has been placed in the plugins directory.");
- }
-
- AuthenticationManager authMgr = null;
-
- for (final Iterator<AuthenticationManagerPluginFactory<? extends Plugin>> iterator = factories.iterator(); iterator.hasNext();)
- {
- final AuthenticationManagerPluginFactory<? extends Plugin> factory = (AuthenticationManagerPluginFactory<? extends Plugin>) iterator.next();
- final AuthenticationManager tmp = factory.newInstance(securityConfiguration);
- if (tmp != null)
- {
- if (authMgr != null)
- {
- throw new ConfigurationException("Cannot configure more than one authentication manager."
- + " Both " + tmp.getClass() + " and " + authMgr.getClass() + " are configured."
- + " Remove configuration for one of the authentication manager, or remove the plugin JAR"
- + " from the classpath.");
- }
- authMgr = tmp;
- }
- }
-
- if (authMgr == null)
- {
- throw new ConfigurationException("No authentication managers configured within the configure file.");
- }
-
- return authMgr;
+ return new AuthenticationManagerRegistry(_configuration, _pluginManager);
}
protected void initialiseVirtualHosts() throws Exception
@@ -400,23 +363,18 @@ public abstract class ApplicationRegistr
getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
}
- protected void initialiseManagedObjectRegistry() throws AMQException
- {
- _managedObjectRegistry = new NoopManagedObjectRegistry();
- }
-
public void initialiseStatisticsReporting()
{
long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
final boolean reset = _configuration.isStatisticsReportResetEnabled();
-
+
/* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
if (report > 0L && (broker || virtualhost))
{
_reportingTimer = new Timer("Statistics-Reporting", true);
-
+
_reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset),
@@ -545,15 +503,13 @@ public abstract class ApplicationRegistr
//Shutdown virtualhosts
close(_virtualHostRegistry);
- close(_authenticationManager);
+ close(_authenticationManagerRegistry);
close(_qmfService);
close(_pluginManager);
- close(_managedObjectRegistry);
-
- BrokerConfig broker = getBroker();
+ BrokerConfig broker = getBrokerConfig();
if(broker != null)
{
broker.getSystem().removeBroker(broker);
@@ -569,12 +525,14 @@ public abstract class ApplicationRegistr
private void unbind()
{
+ List<QpidAcceptor> removedAcceptors = new ArrayList<QpidAcceptor>();
synchronized (_acceptors)
{
for (InetSocketAddress bindAddress : _acceptors.keySet())
{
QpidAcceptor acceptor = _acceptors.get(bindAddress);
+ removedAcceptors.add(acceptor);
try
{
acceptor.getNetworkTransport().close();
@@ -587,6 +545,16 @@ public abstract class ApplicationRegistr
CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort()));
}
}
+ synchronized (_portBindingListeners)
+ {
+ for(QpidAcceptor acceptor : removedAcceptors)
+ {
+ for(PortBindingListener listener : _portBindingListeners)
+ {
+ listener.unbound(acceptor);
+ }
+ }
+ }
}
public ServerConfiguration getConfiguration()
@@ -600,6 +568,13 @@ public abstract class ApplicationRegistr
{
_acceptors.put(bindAddress, acceptor);
}
+ synchronized (_portBindingListeners)
+ {
+ for(PortBindingListener listener : _portBindingListeners)
+ {
+ listener.bound(acceptor, bindAddress);
+ }
+ }
}
public VirtualHostRegistry getVirtualHostRegistry()
@@ -612,14 +587,16 @@ public abstract class ApplicationRegistr
return _securityManager;
}
- public ManagedObjectRegistry getManagedObjectRegistry()
+ @Override
+ public AuthenticationManager getAuthenticationManager(SocketAddress address)
{
- return _managedObjectRegistry;
+ return _authenticationManagerRegistry.getAuthenticationManager(address);
}
- public AuthenticationManager getAuthenticationManager()
+ @Override
+ public IAuthenticationManagerRegistry getAuthenticationManagerRegistry()
{
- return _authenticationManager;
+ return _authenticationManagerRegistry;
}
public PluginManager getPluginManager()
@@ -636,7 +613,7 @@ public abstract class ApplicationRegistr
{
return _rootMessageLogger;
}
-
+
public RootMessageLogger getCompositeStartupMessageLogger()
{
return _startupMessageLogger;
@@ -652,69 +629,63 @@ public abstract class ApplicationRegistr
return _qmfService;
}
- public BrokerConfig getBroker()
+ public BrokerConfig getBrokerConfig()
{
- return _broker;
+ return _brokerConfig;
}
- public void setBroker(final BrokerConfig broker)
+ public void setBrokerConfig(final BrokerConfig broker)
{
- _broker = broker;
+ _brokerConfig = broker;
}
public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
{
VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
_virtualHostRegistry.registerVirtualHost(virtualHost);
- getBroker().addVirtualHost(virtualHost);
+ getBrokerConfig().addVirtualHost(virtualHost);
return virtualHost;
}
-
+
public void registerMessageDelivered(long messageSize)
{
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
}
-
+
public void registerMessageReceived(long messageSize, long timestamp)
{
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
-
+
for (VirtualHost vhost : _virtualHostRegistry.getVirtualHosts())
{
vhost.resetStatistics();
@@ -723,25 +694,12 @@ public abstract class ApplicationRegistr
public void initialiseStatistics()
{
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- getConfiguration().isStatisticsGenerationBrokerEnabled());
-
_messagesDelivered = new StatisticsCounter("messages-delivered");
_dataDelivered = new StatisticsCounter("bytes-delivered");
_messagesReceived = new StatisticsCounter("messages-received");
_dataReceived = new StatisticsCounter("bytes-received");
}
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
private void logStartupMessages(LogActor logActor)
{
logActor.message(BrokerMessages.STARTUP(QpidProperties.getReleaseVersion(), QpidProperties.getBuildVersion()));
@@ -755,4 +713,60 @@ public abstract class ApplicationRegistr
logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
}
+ public Broker getBroker()
+ {
+ return _broker;
+ }
+
+ @Override
+ public void addPortBindingListener(PortBindingListener listener)
+ {
+ synchronized (_portBindingListeners)
+ {
+ _portBindingListeners.add(listener);
+ }
+ }
+
+
+ @Override
+ public boolean useHTTPManagement()
+ {
+ return _httpManagementPort != -1;
+ }
+
+ @Override
+ public int getHTTPManagementPort()
+ {
+ return _httpManagementPort;
+ }
+
+ @Override
+ public boolean useHTTPSManagement()
+ {
+ return _httpsManagementPort != -1;
+ }
+
+ @Override
+ public int getHTTPSManagementPort()
+ {
+ return _httpsManagementPort;
+ }
+
+ public LogRecorder getLogRecorder()
+ {
+ return _logRecorder;
+ }
+
+ @Override
+ public void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener)
+ {
+ if(_authenticationManagerRegistry == null)
+ {
+ _authManagerChangeListeners.add(registryChangeListener);
+ }
+ else
+ {
+ _authenticationManagerRegistry.addRegistryChangeListener(registryChangeListener);
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Fri Aug 3 12:13:32 2012
@@ -44,13 +44,13 @@ public class BrokerConfigAdapter impleme
private final Map<UUID, VirtualHostConfig> _vhosts = new ConcurrentHashMap<UUID, VirtualHostConfig>();
private final long _createTime = System.currentTimeMillis();
- private UUID _id;
+ private UUID _qmfId;
private String _federationTag;
public BrokerConfigAdapter(final IApplicationRegistry instance)
{
_instance = instance;
- _id = instance.getConfigStore().createId();
+ _qmfId = instance.getConfigStore().createId();
_federationTag = UUID.randomUUID().toString();
}
@@ -114,7 +114,7 @@ public class BrokerConfigAdapter impleme
public void addVirtualHost(final VirtualHostConfig virtualHost)
{
- _vhosts.put(virtualHost.getId(), virtualHost);
+ _vhosts.put(virtualHost.getQMFId(), virtualHost);
getConfigStore().addConfiguredObject(virtualHost);
}
@@ -141,9 +141,10 @@ public class BrokerConfigAdapter impleme
vhost.createBrokerConnection(transport, host, port, "", durable, authMechanism, username, password);
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public BrokerConfigType getConfigType()
@@ -184,7 +185,7 @@ public class BrokerConfigAdapter impleme
public String toString()
{
return "BrokerConfigAdapter{" +
- "_id=" + _id +
+ "_id=" + _qmfId +
", _system=" + _system +
", _vhosts=" + _vhosts +
", _createTime=" + _createTime +
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri Aug 3 12:13:32 2012
@@ -25,8 +25,6 @@ import org.osgi.framework.BundleContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.management.JMXManagedObjectRegistry;
-import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import java.io.File;
@@ -41,18 +39,4 @@ public class ConfigurationFileApplicatio
{
super(new ServerConfiguration(configurationURL), bundleContext);
}
-
- @Override
- protected void initialiseManagedObjectRegistry() throws AMQException
- {
- if (getConfiguration().getManagementEnabled())
- {
- setManagedObjectRegistry(new JMXManagedObjectRegistry());
- }
- else
- {
- setManagedObjectRegistry(new NoopManagedObjectRegistry());
- }
- }
-
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Fri Aug 3 12:13:32 2012
@@ -27,16 +27,19 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.logging.RootMessageLogger;
-import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
import java.util.UUID;
public interface IApplicationRegistry extends StatisticsGatherer
@@ -60,9 +63,18 @@ public interface IApplicationRegistry ex
*/
ServerConfiguration getConfiguration();
- ManagedObjectRegistry getManagedObjectRegistry();
+ /**
+ * Get the AuthenticationManager for the given socket address
+ *
+ * If no AuthenticationManager has been specifically set for the given address, then use the default
+ * AuthenticationManager
+ *
+ * @param address The (listening) socket address for which the AuthenticationManager is required
+ * @return the AuthenticationManager
+ */
+ AuthenticationManager getAuthenticationManager(SocketAddress address);
- AuthenticationManager getAuthenticationManager();
+ IAuthenticationManagerRegistry getAuthenticationManagerRegistry();
VirtualHostRegistry getVirtualHostRegistry();
@@ -85,15 +97,39 @@ public interface IApplicationRegistry ex
QMFService getQMFService();
- void setBroker(BrokerConfig broker);
+ void setBrokerConfig(BrokerConfig broker);
- BrokerConfig getBroker();
+ BrokerConfig getBrokerConfig();
+
+ Broker getBroker();
VirtualHost createVirtualHost(VirtualHostConfiguration vhostConfig) throws Exception;
ConfigStore getConfigStore();
void setConfigStore(ConfigStore store);
-
+
void initialiseStatisticsReporting();
+
+ Map<InetSocketAddress, QpidAcceptor> getAcceptors();
+
+ void addPortBindingListener(PortBindingListener listener);
+
+ boolean useHTTPManagement();
+
+ int getHTTPManagementPort();
+
+ boolean useHTTPSManagement();
+
+ int getHTTPSManagementPort();
+
+ void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener);
+
+ public interface PortBindingListener
+ {
+ public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress);
+ public void unbound(QpidAcceptor acceptor);
+
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/AbstractPasswordFilePrincipalDatabase.java Fri Aug 3 12:13:32 2012
@@ -198,7 +198,7 @@ public abstract class AbstractPasswordFi
try
{
_userUpdate.lock();
- _userMap.clear();
+ final Map<String, U> newUserMap = new HashMap<String, U>();
BufferedReader reader = null;
try
@@ -216,7 +216,7 @@ public abstract class AbstractPasswordFi
U user = createUserFromFileData(result);
getLogger().info("Created user:" + user);
- _userMap.put(user.getName(), user);
+ newUserMap.put(user.getName(), user);
}
}
finally
@@ -226,6 +226,9 @@ public abstract class AbstractPasswordFi
reader.close();
}
}
+
+ _userMap.clear();
+ _userMap.putAll(newUserMap);
}
finally
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Fri Aug 3 12:13:32 2012
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,25 +20,24 @@
*/
package org.apache.qpid.server.security.auth.manager;
+import java.security.Principal;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-import javax.security.auth.callback.CallbackHandler;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
/**
* Implementations of the AuthenticationManager are responsible for determining
* the authenticity of a user's credentials.
- *
+ *
* If the authentication is successful, the manager is responsible for producing a populated
* {@link javax.security.auth.Subject} containing the user's identity and zero or more principals representing
* groups to which the user belongs.
* <p>
* The {@link #initialise()} method is responsible for registering SASL mechanisms required by
* the manager. The {@link #close()} method must reverse this registration.
- *
+ *
*/
public interface AuthenticationManager extends Closeable, Plugin
{
@@ -64,11 +63,11 @@ public interface AuthenticationManager e
*
* @param mechanism mechanism name
* @param localFQDN domain name
- *
+ * @param externalPrincipal externally authenticated Principal
* @return SASL server
* @throws SaslException
*/
- SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException;
+ SaslServer createSaslServer(String mechanism, String localFQDN, Principal externalPrincipal) throws SaslException;
/**
* Authenticates a user using SASL negotiation.
@@ -90,5 +89,4 @@ public interface AuthenticationManager e
*/
AuthenticationResult authenticate(String username, String password);
- CallbackHandler getHandler(String mechanism);
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Fri Aug 3 12:13:32 2012
@@ -14,12 +14,13 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.security.auth.manager;
+import java.security.Principal;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
@@ -31,7 +32,6 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
-import org.apache.qpid.server.security.auth.management.AMQUserManagementMBean;
import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser;
import org.apache.qpid.server.security.auth.sasl.JCAProvider;
import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
@@ -60,9 +60,9 @@ import java.util.TreeMap;
* Concrete implementation of the AuthenticationManager that determines if supplied
* user credentials match those appearing in a PrincipalDatabase. The implementation
* of the PrincipalDatabase is determined from the configuration.
- *
+ *
* This implementation also registers the JMX UserManagemement MBean.
- *
+ *
* This plugin expects configuration such as:
*
* <pre>
@@ -97,13 +97,14 @@ public class PrincipalDatabaseAuthentica
private PrincipalDatabase _principalDatabase = null;
- private AMQUserManagementMBean _mbean = null;
-
public static final AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager> FACTORY = new AuthenticationManagerPluginFactory<PrincipalDatabaseAuthenticationManager>()
{
public PrincipalDatabaseAuthenticationManager newInstance(final ConfigurationPlugin config) throws ConfigurationException
{
- final PrincipalDatabaseAuthenticationManagerConfiguration configuration = config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
+ final PrincipalDatabaseAuthenticationManagerConfiguration configuration =
+ config == null
+ ? null
+ : (PrincipalDatabaseAuthenticationManagerConfiguration) config.getConfiguration(PrincipalDatabaseAuthenticationManagerConfiguration.class.getName());
// If there is no configuration for this plugin then don't load it.
if (configuration == null)
@@ -130,7 +131,7 @@ public class PrincipalDatabaseAuthentica
};
public static class PrincipalDatabaseAuthenticationManagerConfiguration extends ConfigurationPlugin {
-
+
public static final ConfigurationPluginFactory FACTORY = new ConfigurationPluginFactory()
{
public List<String> getParentPaths()
@@ -141,7 +142,7 @@ public class PrincipalDatabaseAuthentica
public ConfigurationPlugin newInstance(final String path, final Configuration config) throws ConfigurationException
{
final ConfigurationPlugin instance = new PrincipalDatabaseAuthenticationManagerConfiguration();
-
+
instance.setConfiguration(path, config);
return instance;
}
@@ -157,16 +158,16 @@ public class PrincipalDatabaseAuthentica
public void validateConfiguration() throws ConfigurationException
{
}
-
+
public String getPrincipalDatabaseClass()
{
return getConfig().getString("principal-database.class");
}
-
+
public Map<String,String> getPdClassAttributeMap() throws ConfigurationException
{
- final List<String> argumentNames = getConfig().getList("principal-database.attributes.attribute.name");
- final List<String> argumentValues = getConfig().getList("principal-database.attributes.attribute.value");
+ final List<String> argumentNames = (List) getConfig().getList("principal-database.attributes.attribute.name");
+ final List<String> argumentValues = (List) getConfig().getList("principal-database.attributes.attribute.value");
final Map<String,String> attributes = new HashMap<String,String>(argumentNames.size());
for (int i = 0; i < argumentNames.size(); i++)
@@ -181,7 +182,7 @@ public class PrincipalDatabaseAuthentica
}
}
- protected PrincipalDatabaseAuthenticationManager()
+ protected PrincipalDatabaseAuthenticationManager()
{
}
@@ -207,11 +208,9 @@ public class PrincipalDatabaseAuthentica
{
_logger.warn("No additional SASL providers registered.");
}
-
- registerManagement();
}
- private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
+ private void initialiseAuthenticationMechanisms(Map<String, Class<? extends SaslServerFactory>> providerMap, PrincipalDatabase database)
{
if (database == null || database.getMechanisms().size() == 0)
{
@@ -259,7 +258,7 @@ public class PrincipalDatabaseAuthentica
_principalDatabase = createPrincipalDatabaseImpl(pdClazz);
- configPrincipalDatabase(_principalDatabase, pdamConfig);
+ configPrincipalDatabase(_principalDatabase, pdamConfig);
}
public String getMechanisms()
@@ -267,7 +266,7 @@ public class PrincipalDatabaseAuthentica
return _mechanisms;
}
- public SaslServer createSaslServer(String mechanism, String localFQDN) throws SaslException
+ public SaslServer createSaslServer(String mechanism, String localFQDN, Principal externalPrincipal) throws SaslException
{
return Sasl.createSaslServer(mechanism, "AMQP", localFQDN, _serverCreationProperties.get(mechanism),
_callbackHandlerMap.get(mechanism));
@@ -300,11 +299,6 @@ public class PrincipalDatabaseAuthentica
}
}
- public CallbackHandler getHandler(String mechanism)
- {
- return _callbackHandlerMap.get(mechanism);
- }
-
/**
* @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
*/
@@ -333,8 +327,6 @@ public class PrincipalDatabaseAuthentica
{
_mechanisms = null;
Security.removeProvider(PROVIDER_NAME);
-
- unregisterManagement();
}
private PrincipalDatabase createPrincipalDatabaseImpl(final String pdClazz) throws ConfigurationException
@@ -408,6 +400,11 @@ public class PrincipalDatabaseAuthentica
}
}
+ public PrincipalDatabase getPrincipalDatabase()
+ {
+ return _principalDatabase;
+ }
+
private String generateSetterName(String argName) throws ConfigurationException
{
if ((argName == null) || (argName.length() == 0))
@@ -428,41 +425,4 @@ public class PrincipalDatabaseAuthentica
{
_principalDatabase = principalDatabase;
}
-
- protected void registerManagement()
- {
- try
- {
- _logger.info("Registering UserManagementMBean");
-
- _mbean = new AMQUserManagementMBean();
- _mbean.setPrincipalDatabase(_principalDatabase);
- _mbean.register();
- }
- catch (Exception e)
- {
- _logger.warn("User management disabled as unable to create MBean:", e);
- _mbean = null;
- }
- }
-
- protected void unregisterManagement()
- {
- try
- {
- if (_mbean != null)
- {
- _logger.info("Unregistering UserManagementMBean");
- _mbean.unregister();
- }
- }
- catch (Exception e)
- {
- _logger.warn("Failed to unregister User management MBean:", e);
- }
- finally
- {
- _mbean = null;
- }
- }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.server.security.auth.rmi;
+import java.net.SocketAddress;
+
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
@@ -37,11 +40,13 @@ public class RMIPasswordAuthenticator im
static final String INVALID_CREDENTIALS = "Invalid user details supplied";
static final String CREDENTIALS_REQUIRED = "User details are required. " +
"Please ensure you are using an up to date management console to connect.";
-
+
private AuthenticationManager _authenticationManager = null;
+ private SocketAddress _socketAddress;
- public RMIPasswordAuthenticator()
+ public RMIPasswordAuthenticator(SocketAddress socketAddress)
{
+ _socketAddress = socketAddress;
}
public void setAuthenticationManager(final AuthenticationManager authenticationManager)
@@ -79,11 +84,25 @@ public class RMIPasswordAuthenticator im
{
throw new SecurityException(SHOULD_BE_NON_NULL);
}
-
+
// Verify that an AuthenticationManager has been set.
if (_authenticationManager == null)
{
- throw new SecurityException(UNABLE_TO_LOOKUP);
+ try
+ {
+ if(ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress) != null)
+ {
+ _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress);
+ }
+ else
+ {
+ throw new SecurityException(UNABLE_TO_LOOKUP);
+ }
+ }
+ catch(IllegalStateException e)
+ {
+ throw new SecurityException(UNABLE_TO_LOOKUP);
+ }
}
final AuthenticationResult result = _authenticationManager.authenticate(username, password);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org