You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [18/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Fri Aug  3 12:13:32 2012
@@ -20,17 +20,11 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.txn.RollbackOnlyDtxException;
-import org.apache.qpid.server.txn.TimeoutDtxException;
-import static org.apache.qpid.util.Serial.gt;
-
 import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -40,18 +34,16 @@ import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.security.auth.Subject;
-
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.TransactionTimeoutHelper;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -63,7 +55,10 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -82,20 +77,25 @@ import org.apache.qpid.server.txn.Incorr
 import org.apache.qpid.server.txn.JoinAndResumeDtxException;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
 import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServerSession extends Session 
-        implements AuthorizationHolder, SessionConfig, 
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+public class ServerSession extends Session
+        implements AuthorizationHolder, SessionConfig,
                    AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-    
+
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
     private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
@@ -105,7 +105,7 @@ public class ServerSession extends Sessi
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
 
-    private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
@@ -145,6 +145,8 @@ public class ServerSession extends Sessi
 
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
 
+    private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
     ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
         this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -158,6 +160,8 @@ public class ServerSession extends Sessi
         _logSubject = new ChannelLogSubject(this);
         _id = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
+
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
     }
 
     protected void setState(State state)
@@ -167,9 +171,19 @@ public class ServerSession extends Sessi
         if (state == State.OPEN)
         {
             _actor.message(ChannelMessages.CREATE());
+            if(_blocking.get())
+            {
+                invokeBlock();
+            }
         }
     }
 
+    private void invokeBlock()
+    {
+        invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+        invoke(new MessageStop(""));
+    }
+
     private ConfigStore getConfigStore()
     {
         return getConnectionConfig().getConfigStore();
@@ -455,7 +469,7 @@ public class ServerSession extends Sessi
     {
         return _transaction.isTransactional();
     }
-    
+
     public boolean inTransaction()
     {
         return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -621,16 +635,26 @@ public class ServerSession extends Sessi
         return _txnRejects.get();
     }
 
+    public int getChannelId()
+    {
+        return getChannel();
+    }
+
     public Long getTxnCount()
     {
         return _txnCount.get();
     }
 
+    public Long getTxnStart()
+    {
+        return _txnStarts.get();
+    }
+
     public Principal getAuthorizedPrincipal()
     {
         return getConnection().getAuthorizedPrincipal();
     }
-    
+
     public Subject getAuthorizedSubject()
     {
         return getConnection().getAuthorizedSubject();
@@ -661,7 +685,8 @@ public class ServerSession extends Sessi
         return (VirtualHost) _connectionConfig.getVirtualHost();
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
         return _id;
     }
@@ -755,63 +780,85 @@ public class ServerSession extends Sessi
             long openTime = currentTime - _transaction.getTransactionStartTime();
             long idleTime = currentTime - _txnUpdateTime.get();
 
-            // Log a warning on idle or open transactions
-            if (idleWarn > 0L && idleTime > idleWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
-                _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
-            }
-            else if (openWarn > 0L && openTime > openWarn)
-            {
-                CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
-                _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
-            }
-
-            // Close connection for idle or open transactions that have timed out
-            if (idleClose > 0L && idleTime > idleClose)
+            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
             {
                 getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+                return;
             }
-            else if (openClose > 0L && openTime > openClose)
+
+            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
             {
                 getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+                return;
             }
         }
     }
 
     public void block(AMQQueue queue)
     {
+        block(queue, queue.getName());
+    }
+
+    public void block()
+    {
+        block(this, "** All Queues **");
+    }
 
-        if(_blockingQueues.add(queue))
-        {
 
-            if(_blocking.compareAndSet(false,true))
+    private void block(Object queue, String name)
+    {
+        synchronized (_blockingEntities)
+        {
+            if(_blockingEntities.add(queue))
             {
-                invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
-                invoke(new MessageStop(""));
-                _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
-            }
+
+                if(_blocking.compareAndSet(false,true))
+                {
+                    if(getState() == State.OPEN)
+                    {
+                        invokeBlock();
+                    }
+                    _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+                }
 
 
+            }
         }
     }
 
     public void unblock(AMQQueue queue)
     {
-        if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+        unblock((Object)queue);
+    }
+
+    public void unblock()
+    {
+        unblock(this);
+    }
+
+    private void unblock(Object queue)
+    {
+        synchronized(_blockingEntities)
         {
-            if(_blocking.compareAndSet(true,false) && !isClosing())
+            if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
             {
+                if(_blocking.compareAndSet(true,false) && !isClosing())
+                {
 
-                _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
-                MessageFlow mf = new MessageFlow();
-                mf.setUnit(MessageCreditUnit.MESSAGE);
-                mf.setDestination("");
-                _outstandingCredit.set(Integer.MAX_VALUE);
-                mf.setValue(Integer.MAX_VALUE);
-                invoke(mf);
+                    _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+                    MessageFlow mf = new MessageFlow();
+                    mf.setUnit(MessageCreditUnit.MESSAGE);
+                    mf.setDestination("");
+                    _outstandingCredit.set(Integer.MAX_VALUE);
+                    mf.setValue(Integer.MAX_VALUE);
+                    invoke(mf);
 
 
+                }
             }
         }
     }
@@ -1020,7 +1067,12 @@ public class ServerSession extends Sessi
 
     public int compareTo(AMQSessionModel session)
     {
-        return getId().compareTo(session.getId());
+        return getQMFId().compareTo(session.getQMFId());
     }
 
+    @Override
+    public int getConsumerCount()
+    {
+        return _subscriptions.values().size();
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri Aug  3 12:13:32 2012
@@ -99,9 +99,9 @@ public class ServerSessionDelegate exten
                 Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
                 if(newOutstanding == null || newOutstanding == asyncCommandMark)
                 {
-                    session.processed(method);    
+                    session.processed(method);
                 }
-                
+
                 if(newOutstanding != null)
                 {
                     ((ServerSession)session).completeAsyncCommands();
@@ -240,13 +240,13 @@ public class ServerSessionDelegate exten
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
-                    
+
                     FilterManager filterManager = null;
-                    try 
+                    try
                     {
                         filterManager = FilterManagerFactory.createManager(method.getArguments());
-                    } 
-                    catch (AMQException amqe) 
+                    }
+                    catch (AMQException amqe)
                     {
                         exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
                         return;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate exten
                                                                   method.getAcceptMode(),
                                                                   method.getAcquireMode(),
                                                                   MessageFlowMode.WINDOW,
-                                                                  creditManager, 
+                                                                  creditManager,
                                                                   filterManager,
                                                                   method.getArguments());
 
@@ -297,13 +297,13 @@ public class ServerSessionDelegate exten
 
         final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
         messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
-        
+
         if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
         {
             ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
             String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
             exception(ssn, xfr, errorCode, description);
-            
+
             return;
         }
 
@@ -807,7 +807,7 @@ public class ServerSessionDelegate exten
         }
     }
 
-    // TODO decouple AMQException and AMQConstant error codes 
+    // TODO decouple AMQException and AMQConstant error codes
     private void exception(Session session, Method method, AMQException exception, String message)
     {
         ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
@@ -823,7 +823,7 @@ public class ServerSessionDelegate exten
             }
         }
         String description = message + "': " + exception.getMessage();
-        
+
         exception(session, method, errorCode, description);
     }
 
@@ -1226,11 +1226,7 @@ public class ServerSessionDelegate exten
                     try
                     {
                         queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
-                        if(method.getExclusive())
-                        {
-                            queue.setExclusive(true);
-                        }
-                        else if(method.getAutoDelete())
+                        if(!method.getExclusive() && method.getAutoDelete())
                         {
                             queue.setDeleteOnNoConsumers(true);
                         }
@@ -1349,9 +1345,9 @@ public class ServerSessionDelegate exten
                                                                            + " as exclusive queue with same name "
                                                                            + "declared on another session";
                     ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-    
+
                     exception(session, method, errorCode, description);
-    
+
                     return;
             }
         }
@@ -1389,7 +1385,7 @@ public class ServerSessionDelegate exten
     {
         String owner = body.getExclusive() ? session.getClientID() : null;
 
-        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner,
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner,
                                                                   body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
 
         return queue;
@@ -1436,7 +1432,7 @@ public class ServerSessionDelegate exten
                 else
                 {
                     VirtualHost virtualHost = getVirtualHost(session);
-                    
+
                     try
                     {
                         queue.delete();

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Aug  3 12:13:32 2012
@@ -44,11 +44,16 @@ import java.util.List;
  */
 public class AsyncAutoCommitTransaction implements ServerTransaction
 {
+    static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode";
+
     protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
 
     private final MessageStore _messageStore;
     private final FutureRecorder _futureRecorder;
 
+    //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence
+    private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE);
+
     public static interface FutureRecorder
     {
         public void recordFuture(StoreFuture future, Action action);
@@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction 
         }
     }
 
+    private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+    {
+        if(action != null)
+        {
+            // For persistent messages, do not synchronously invoke postCommit even if the future  is completed.
+            // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order.
+            if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode)
+            {
+                action.postCommit();
+            }
+            else
+            {
+                _futureRecorder.recordFuture(future, action);
+            }
+        }
+    }
+
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
         Transaction txn = null;
@@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction 
             {
                 future = StoreFuture.IMMEDIATE_FUTURE;
             }
-            addFuture(future, postTransactionAction);
+            addEnqueueFuture(future, postTransactionAction, message.isPersistent());
             postTransactionAction = null;
         }
         catch (AMQException e)
@@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction 
             {
                 future = StoreFuture.IMMEDIATE_FUTURE;
             }
-            addFuture(future, postTransactionAction);
+            addEnqueueFuture(future, postTransactionAction, message.isPersistent());
             postTransactionAction = null;
 
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Aug  3 12:13:32 2012
@@ -98,12 +98,29 @@ public class DtxBranch
 
     public void setTimeout(long timeout)
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Setting timeout to " + timeout + "s for DtxBranch " + _xid);
+        }
+
         if(_timeoutFuture != null)
         {
-            _timeoutFuture.cancel(false);
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+            }
+
+            boolean succeeded = _timeoutFuture.cancel(false);
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+                              + " for DtxBranch " + _xid);
+            }
         }
+
         _timeout = timeout;
-        _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + timeout;
+        _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + (1000 * timeout);
 
         if(_timeout == 0)
         {
@@ -111,10 +128,23 @@ public class DtxBranch
         }
         else
         {
-            _timeoutFuture = _vhost.scheduleTask(_timeout, new Runnable()
+            long delay = 1000*_timeout;
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Scheduling timeout and rollback after " + delay/1000 +
+                              "s for DtxBranch " + _xid);
+            }
+
+            _timeoutFuture = _vhost.scheduleTask(delay, new Runnable()
             {
                 public void run()
                 {
+                    if(_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Timing out DtxBranch " + _xid);
+                    }
+
                     setState(State.TIMEDOUT);
                     try
                     {
@@ -122,8 +152,7 @@ public class DtxBranch
                     }
                     catch (AMQStoreException e)
                     {
-                        _logger.error("Unexpected error when attempting to rollback XA transaction ("+
-                                      _xid + ") due to  timeout", e);
+                        _logger.error("Unexpected error when attempting to rollback DtxBranch "+ _xid + " due to timeout", e);
                         throw new RuntimeException(e);
                     }
                 }
@@ -199,6 +228,10 @@ public class DtxBranch
 
     public void prepare() throws AMQStoreException
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Performing prepare for DtxBranch " + _xid);
+        }
 
         Transaction txn = _store.newTransaction();
         txn.recordXid(_xid.getFormat(),
@@ -213,12 +246,27 @@ public class DtxBranch
 
     public synchronized void rollback() throws AMQStoreException
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Performing rollback for DtxBranch " + _xid);
+        }
+
         if(_timeoutFuture != null)
         {
-            _timeoutFuture.cancel(false);
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+            }
+
+            boolean succeeded = _timeoutFuture.cancel(false);
             _timeoutFuture = null;
-        }
 
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+                              + " for DtxBranch " + _xid);
+            }
+        }
 
         if(_transaction != null)
         {
@@ -240,10 +288,26 @@ public class DtxBranch
 
     public void commit() throws AMQStoreException
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Performing commit for DtxBranch " + _xid);
+        }
+
         if(_timeoutFuture != null)
         {
-            _timeoutFuture.cancel(false);
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+            }
+
+            boolean succeeded = _timeoutFuture.cancel(false);
             _timeoutFuture = null;
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+                              + " for DtxBranch " + _xid);
+            }
         }
 
         if(_transaction == null)
@@ -342,7 +406,7 @@ public class DtxBranch
             }
             catch(AMQStoreException e)
             {
-                _logger.error("Error while closing XA branch", e);
+                _logger.error("Error while closing DtxBranch " + _xid, e);
             }
         }
     }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java Fri Aug  3 12:13:32 2012
@@ -37,7 +37,7 @@ public class DtxRegistry
     private static final class ComparableXid
     {
         private final Xid _xid;
-        
+
         private ComparableXid(Xid xid)
         {
             _xid = xid;
@@ -58,7 +58,7 @@ public class DtxRegistry
             ComparableXid that = (ComparableXid) o;
 
             return compareBytes(_xid.getBranchId(), that._xid.getBranchId())
-                    && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId()); 
+                    && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId());
         }
 
         private static boolean compareBytes(byte[] a, byte[] b)
@@ -94,7 +94,7 @@ public class DtxRegistry
             return result;
         }
     }
-    
+
     public synchronized DtxBranch getBranch(Xid xid)
     {
         return _branches.get(new ComparableXid(xid));
@@ -116,7 +116,7 @@ public class DtxRegistry
         return (_branches.remove(new ComparableXid(branch.getXid())) != null);
     }
 
-    public void commit(Xid id, boolean onePhase)
+    public synchronized void commit(Xid id, boolean onePhase)
             throws IncorrectDtxStateException, UnknownDtxBranchException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
     {
         DtxBranch branch = getBranch(id);
@@ -204,7 +204,7 @@ public class DtxRegistry
         }
     }
 
-    public void rollback(Xid id)
+    public synchronized void rollback(Xid id)
             throws IncorrectDtxStateException,
             UnknownDtxBranchException,
             AMQStoreException, TimeoutDtxException
@@ -318,6 +318,7 @@ public class DtxRegistry
                 branch.disassociateSession(session);
             }
         }
+
     }
 
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Aug  3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.txn;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,9 @@ package org.apache.qpid.server.txn;
  * under the License.
  * 
  */
+package org.apache.qpid.server.txn;
 
+import org.apache.qpid.server.store.StoreFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,7 @@ public class LocalTransaction implements
     private volatile Transaction _transaction;
     private MessageStore _transactionLog;
     private long _txnStartTime = 0L;
+    private StoreFuture _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -68,11 +70,13 @@ public class LocalTransaction implements
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
     }
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         try
@@ -131,10 +136,7 @@ public class LocalTransaction implements
     {
         try
         {
-            for(Action action : _postTransactionActions)
-            {
-                action.onRollback();
-            }
+            doRollbackActions();
         }
         finally
         {
@@ -151,7 +153,7 @@ public class LocalTransaction implements
             }
             finally
             {
-		resetDetails();
+		        resetDetails();
             }
         }
 
@@ -176,6 +178,7 @@ public class LocalTransaction implements
 
     public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements
 
     public void commit()
     {
+        sync();
         commit(null);
     }
 
     public void commit(Runnable immediateAction)
     {
+        sync();
         try
         {
             if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements
                 immediateAction.run();
             }
 
-            for(int i = 0; i < _postTransactionActions.size(); i++)
+            doPostTransactionActions();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Failed to commit transaction", e);
+
+            doRollbackActions();
+            throw new RuntimeException("Failed to commit transaction", e);
+        }
+        finally
+        {
+            resetDetails();
+        }
+    }
+
+    private void doRollbackActions()
+    {
+        for(Action action : _postTransactionActions)
+        {
+            action.onRollback();
+        }
+    }
+
+    public StoreFuture commitAsync(final Runnable deferred)
+    {
+        sync();
+        try
+        {
+            StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+            if(_transaction != null)
             {
-                _postTransactionActions.get(i).postCommit();
+                future = new StoreFuture()
+                        {
+                            private volatile boolean _completed = false;
+                            private StoreFuture _underlying = _transaction.commitTranAsync();
+
+                            @Override
+                            public boolean isComplete()
+                            {
+                                return _completed || checkUnderlyingCompletion();
+                            }
+
+                            @Override
+                            public void waitForCompletion()
+                            {
+                                if(!_completed)
+                                {
+                                    _underlying.waitForCompletion();
+                                    checkUnderlyingCompletion();
+                                }
+                            }
+
+                            private synchronized boolean checkUnderlyingCompletion()
+                            {
+                                if(!_completed && _underlying.isComplete())
+                                {
+                                    completeDeferredWork();
+                                    _completed = true;
+                                }
+                                return _completed;
+
+                            }
+
+                            private void completeDeferredWork()
+                            {
+                                try
+                                {
+                                    doPostTransactionActions();
+                                    deferred.run();
+
+                                }
+                                catch (Exception e)
+                                {
+                                    _logger.error("Failed to commit transaction", e);
+
+                                    doRollbackActions();
+                                    throw new RuntimeException("Failed to commit transaction", e);
+                                }
+                                finally
+                                {
+                                    resetDetails();
+                                }
+                            }
+
+                };
+                _asyncTran = future;
             }
+            else
+            {
+                try
+                {
+                    doPostTransactionActions();
+
+                    deferred.run();
+                }
+                finally
+                {
+                    resetDetails();
+                }
+            }
+
+            return future;
         }
         catch (Exception e)
         {
             _logger.error("Failed to commit transaction", e);
-
-            for(Action action : _postTransactionActions)
+            try
             {
-                action.onRollback();
+                doRollbackActions();
+            }
+            finally
+            {
+                resetDetails();
             }
             throw new RuntimeException("Failed to commit transaction", e);
         }
-        finally
+
+
+    }
+
+    private void doPostTransactionActions()
+    {
+        for(int i = 0; i < _postTransactionActions.size(); i++)
         {
-            resetDetails();
+            _postTransactionActions.get(i).postCommit();
         }
     }
 
     public void rollback()
     {
+        sync();
         try
         {
             if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements
         {
             try
             {
-                for(Action action : _postTransactionActions)
-                {
-                    action.onRollback();
-                }
+                doRollbackActions();
             }
             finally
             {
@@ -306,9 +417,19 @@ public class LocalTransaction implements
             }
         }
     }
-    
+
+    public void sync()
+    {
+        if(_asyncTran != null)
+        {
+            _asyncTran.waitForCompletion();
+            _asyncTran = null;
+        }
+    }
+
     private void resetDetails()
     {
+        _asyncTran = null;
         _transaction = null;
 	    _postTransactionActions.clear();
         _txnStartTime = 0L;

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1333988-1368650

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Fri Aug  3 12:13:32 2012
@@ -44,9 +44,9 @@ public abstract class HouseKeepingTask i
 
     final public void run()
     {
-        // Don't need to undo this as this is a thread pool thread so will
-        // always go through here before we do any real work.
+        String originalThreadName = Thread.currentThread().getName();
         Thread.currentThread().setName(_name);
+
         CurrentActor.set(new AbstractActor(_rootLogger)
         {
             @Override
@@ -67,6 +67,9 @@ public abstract class HouseKeepingTask i
         finally
         {
             CurrentActor.remove();
+
+            // eagerly revert the thread name to make thread dumps more meaningful if captured after task has finished
+            Thread.currentThread().setName(originalThreadName);
         }
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java Fri Aug  3 12:13:32 2012
@@ -25,5 +25,7 @@ public enum State
     INITIALISING,
     ACTIVE,
     PASSIVE,
-    STOPPED
+    STOPPED,
+    /** Terminal state that signifies the virtual host has experienced an unexpected condition. */
+    ERRORED
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Aug  3 12:13:32 2012
@@ -32,7 +32,6 @@ import org.apache.qpid.server.connection
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -62,10 +61,10 @@ public interface VirtualHost extends Dur
 
     void close();
 
-    ManagedObject getManagedObject();
-
     UUID getBrokerId();
 
+    UUID getId();
+
     void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
 
     long getHouseKeepingTaskCount();
@@ -74,7 +73,7 @@ public interface VirtualHost extends Dur
 
     int getHouseKeepingPoolSize();
 
-    void setHouseKeepingPoolSize(int newSize);    
+    void setHouseKeepingPoolSize(int newSize);
 
     int getHouseKeepingActiveCount();
 
@@ -102,4 +101,8 @@ public interface VirtualHost extends Dur
     ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
 
     State getState();
+
+    public void block();
+
+    public void unblock();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Aug  3 12:13:32 2012
@@ -76,15 +76,12 @@ public class VirtualHostConfigRecoveryHa
 
     private final VirtualHost _virtualHost;
 
-    private MessageStoreLogSubject _logSubject;
-
-    private MessageStore _store;
-
     private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
-    private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
-    private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
+    private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+    private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
 
+    private MessageStoreLogSubject _logSubject;
+    private MessageStore _store;
 
     public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
     {
@@ -100,7 +97,7 @@ public class VirtualHostConfigRecoveryHa
         return this;
     }
 
-    public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments)
+    public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId)
     {
         try
         {
@@ -111,6 +108,17 @@ public class VirtualHostConfigRecoveryHa
                 q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
                                                        FieldTable.convertToMap(arguments));
                 _virtualHost.getQueueRegistry().registerQueue(q);
+
+                if (alternateExchangeId != null)
+                {
+                    Exchange altExchange = _virtualHost.getExchangeRegistry().getExchange(alternateExchangeId);
+                    if (altExchange == null)
+                    {
+                        _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
+                        return;
+                    }
+                    q.setAlternateExchange(altExchange);
+                }
             }
     
             CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
@@ -120,12 +128,12 @@ public class VirtualHostConfigRecoveryHa
         }
         catch (AMQException e)
         {
-            // TODO
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
         }
     }
 
-    public ExchangeRecoveryHandler completeQueueRecovery()
+    @Override
+    public BindingRecoveryHandler completeQueueRecovery()
     {
         return this;
     }
@@ -145,19 +153,17 @@ public class VirtualHostConfigRecoveryHa
         }
         catch (AMQException e)
         {
-            // TODO
-            throw new RuntimeException(e);
+            throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
         }
     }
 
-    public BindingRecoveryHandler completeExchangeRecovery()
+    public QueueRecoveryHandler completeExchangeRecovery()
     {
         return this;
     }
 
     public StoredMessageRecoveryHandler begin()
     {
-        // TODO - log begin
         return this;
     }
 
@@ -182,7 +188,6 @@ public class VirtualHostConfigRecoveryHa
 
     public void completeMessageRecovery()
     {
-        //TODO - log end
     }
 
     public BridgeRecoveryHandler brokerLink(final UUID id,

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Fri Aug  3 12:13:32 2012
@@ -20,12 +20,20 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.configuration.BrokerConfig;
 import org.apache.qpid.server.configuration.ConfigStore;
@@ -45,11 +53,10 @@ import org.apache.qpid.server.federation
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -59,37 +66,25 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.Event;
 import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.HAMessageStore;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
 import org.apache.qpid.server.store.OperationalLoggingListener;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 
-import javax.management.JMException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-
-public class VirtualHostImpl implements VirtualHost
+public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
 {
     private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
 
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
 
-    private final UUID _id;
+    private final UUID _qmfId;
 
     private final String _name;
 
+    private final UUID _id;
+
     private final long _createTime = System.currentTimeMillis();
 
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -104,10 +99,6 @@ public class VirtualHostImpl implements 
 
     private final VirtualHostConfiguration _vhostConfig;
 
-    private final VirtualHostMBean _virtualHostMBean;
-
-    private final AMQBrokerManagerMBean _brokerMBean;
-
     private final QueueRegistry _queueRegistry;
 
     private final ExchangeRegistry _exchangeRegistry;
@@ -122,13 +113,12 @@ public class VirtualHostImpl implements 
 
     private final MessageStore _messageStore;
 
-    private State _state = State.INITIALISING;
-
-    private boolean _statisticsEnabled = false;
+    private volatile State _state = State.INITIALISING;
 
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+    private boolean _blocked;
 
     public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
     {
@@ -143,20 +133,21 @@ public class VirtualHostImpl implements 
         }
 
         _appRegistry = appRegistry;
-        _brokerConfig = _appRegistry.getBroker();
+        _brokerConfig = _appRegistry.getBrokerConfig();
         _vhostConfig = hostConfig;
         _name = _vhostConfig.getName();
         _dtxRegistry = new DtxRegistry();
 
-        _id = _appRegistry.getConfigStore().createId();
+        _qmfId = _appRegistry.getConfigStore().createId();
+        _id = UUIDGenerator.generateVhostUUID(_name);
 
         CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
 
-        _virtualHostMBean = new VirtualHostMBean();
         _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
         _securityManager.configureHostPlugins(_vhostConfig);
 
         _connectionRegistry = new ConnectionRegistry();
+        _connectionRegistry.addRegistryChangeListener(this);
 
         _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
 
@@ -169,15 +160,16 @@ public class VirtualHostImpl implements 
 
         _bindingFactory = new BindingFactory(this);
 
-        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-
-        _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
+        _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
 
         configureMessageStore(hostConfig);
 
         activateNonHAMessageStore();
 
         initialiseStatistics();
+
+        _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+        _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
     }
 
     public IConnectionRegistry getConnectionRegistry()
@@ -195,6 +187,12 @@ public class VirtualHostImpl implements 
         return _id;
     }
 
+    @Override
+    public UUID getQMFId()
+    {
+        return _qmfId;
+    }
+
     public VirtualHostConfigType getConfigType()
     {
         return VirtualHostConfigType.getInstance();
@@ -324,20 +322,19 @@ public class VirtualHostImpl implements 
     }
 
 
-    private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
+    private MessageStore initialiseMessageStore(final String messageStoreClass) throws Exception
     {
-        final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+        final Class<?> clazz = Class.forName(messageStoreClass);
         final Object o = clazz.newInstance();
 
-        if (!(o instanceof MessageStoreFactory))
+        if (!(o instanceof MessageStore))
         {
-            throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+            throw new ClassCastException("Message store factory class must implement " + MessageStore.class +
                                         ". Class " + clazz + " does not.");
         }
 
-        final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
-        final MessageStore messageStore = messageStoreFactory.createMessageStore();
-        final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+        final MessageStore messageStore = (MessageStore) o;
+        final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName());
         OperationalLoggingListener.listen(messageStore, storeLogSubject);
 
         messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
@@ -361,7 +358,10 @@ public class VirtualHostImpl implements 
 
     private void activateNonHAMessageStore() throws Exception
     {
-        _messageStore.activate();
+        if (!(_messageStore instanceof HAMessageStore))
+        {
+            _messageStore.activate();
+        }
     }
 
     private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -534,16 +534,6 @@ public class VirtualHostImpl implements 
         CurrentActor.get().message(VirtualHostMessages.CLOSED());
     }
 
-    public ManagedObject getBrokerMBean()
-    {
-        return _brokerMBean;
-    }
-
-    public ManagedObject getManagedObject()
-    {
-        return _virtualHostMBean;
-    }
-
     public UUID getBrokerId()
     {
         return _appRegistry.getBrokerId();
@@ -558,54 +548,48 @@ public class VirtualHostImpl implements 
     {
         return _bindingFactory;
     }
-    
+
     public void registerMessageDelivered(long messageSize)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
+        _messagesDelivered.registerEvent(1L);
+        _dataDelivered.registerEvent(messageSize);
         _appRegistry.registerMessageDelivered(messageSize);
     }
-    
+
     public void registerMessageReceived(long messageSize, long timestamp)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
+        _messagesReceived.registerEvent(1L, timestamp);
+        _dataReceived.registerEvent(messageSize, timestamp);
         _appRegistry.registerMessageReceived(messageSize, timestamp);
     }
-    
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
     }
-    
+
     public StatisticsCounter getDataReceiptStatistics()
     {
         return _dataReceived;
     }
-    
+
     public StatisticsCounter getMessageDeliveryStatistics()
     {
         return _messagesDelivered;
     }
-    
+
     public StatisticsCounter getDataDeliveryStatistics()
     {
         return _dataDelivered;
     }
-    
+
     public void resetStatistics()
     {
         _messagesDelivered.reset();
         _dataDelivered.reset();
         _messagesReceived.reset();
         _dataReceived.reset();
-        
+
         for (AMQConnectionModel connection : _connectionRegistry.getConnections())
         {
             connection.resetStatistics();
@@ -614,25 +598,12 @@ public class VirtualHostImpl implements 
 
     public void initialiseStatistics()
     {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-        
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
         _dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
         _messagesReceived = new StatisticsCounter("messages-received-" + getName());
         _dataReceived = new StatisticsCounter("bytes-received-" + getName());
     }
 
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
-
     public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments)
     {
         BrokerLink blink = new BrokerLink(this, id, createTime, arguments);
@@ -699,107 +670,155 @@ public class VirtualHostImpl implements 
         return _dtxRegistry;
     }
 
-    @Override
     public String toString()
     {
         return _name;
     }
 
-    @Override
     public State getState()
     {
         return _state;
     }
 
-
-    /**
-     * Virtual host JMX MBean class.
-     *
-     * This has some of the methods implemented from management interface for exchanges. Any
-     * Implementation of an Exchange MBean should extend this class.
-     */
-    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+    public void block()
     {
-        public VirtualHostMBean() throws NotCompliantMBeanException
+        synchronized (_connectionRegistry)
         {
-            super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
+            if(!_blocked)
+            {
+                _blocked = true;
+                for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+                {
+                    conn.block();
+                }
+            }
         }
+    }
 
-        public String getObjectInstanceName()
-        {
-            return ObjectName.quote(_name);
-        }
 
-        public String getName()
+    public void unblock()
+    {
+        synchronized (_connectionRegistry)
         {
-            return _name;
+            if(_blocked)
+            {
+                _blocked = false;
+                for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+                {
+                    conn.unblock();
+                }
+            }
         }
+    }
 
-        public VirtualHostImpl getVirtualHost()
+    public void connectionRegistered(final AMQConnectionModel connection)
+    {
+        if(_blocked)
         {
-            return VirtualHostImpl.this;
+            connection.block();
         }
     }
 
-    private final class BeforeActivationListener implements EventListener
+    public void connectionUnregistered(final AMQConnectionModel connection)
     {
-        @Override
-        public void event(Event event)
+    }
+
+    public void event(final Event event)
+    {
+        switch(event)
         {
-            try
-            {
-                _exchangeRegistry.initialise();
-                initialiseModel(_vhostConfig);
-            } catch (Exception e)
-            {
-                throw new RuntimeException("Failed to initialise virtual host after state change", e);
-            }
+            case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+                block();
+                break;
+            case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+                unblock();
+                break;
         }
     }
 
-    private final class AfterActivationListener implements EventListener
+    private final class BeforeActivationListener implements EventListener
+   {
+       @Override
+       public void event(Event event)
+       {
+           try
+           {
+               _exchangeRegistry.initialise();
+               initialiseModel(_vhostConfig);
+           }
+           catch (Exception e)
+           {
+               throw new RuntimeException("Failed to initialise virtual host after state change", e);
+           }
+       }
+   }
+
+   private final class AfterActivationListener implements EventListener
+   {
+       @Override
+       public void event(Event event)
+       {
+           State finalState = State.ERRORED;
+
+           try
+           {
+               initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+               finalState = State.ACTIVE;
+           }
+           finally
+           {
+               _state = finalState;
+               reportIfError(_state);
+           }
+       }
+   }
+
+    private final class BeforePassivationListener implements EventListener
     {
-        @Override
         public void event(Event event)
         {
-            initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+            State finalState = State.ERRORED;
+
             try
             {
-                _brokerMBean.register();
-            } catch (JMException e)
+                /* the approach here is not ideal as there is a race condition where a
+                 * queue etc could be created while the virtual host is on the way to
+                 * the passivated state.  However the store state change from MASTER to UNKNOWN
+                 * is documented as exceptionally rare..
+                 */
+
+                _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+                removeHouseKeepingTasks();
+
+                _queueRegistry.stopAllAndUnregisterMBeans();
+                _exchangeRegistry.clearAndUnregisterMbeans();
+                _dtxRegistry.close();
+
+                finalState = State.PASSIVE;
+            }
+            finally
             {
-                throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+                _state = finalState;
+                reportIfError(_state);
             }
-
-            _state = State.ACTIVE;
         }
+
     }
 
-    public class BeforePassivationListener implements EventListener
+    private final class BeforeCloseListener implements EventListener
     {
-
         @Override
         public void event(Event event)
         {
-            _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
-            _brokerMBean.unregister();
-            removeHouseKeepingTasks();
-
-            _queueRegistry.stopAllAndUnregisterMBeans();
-            _exchangeRegistry.clearAndUnregisterMbeans();
-            _dtxRegistry.close();
-
-            _state = State.PASSIVE;
+            shutdownHouseKeeping();
         }
     }
 
-    private final class BeforeCloseListener implements EventListener
+    private void reportIfError(State state)
     {
-        @Override
-        public void event(Event event)
+        if (state == State.ERRORED)
         {
-            _brokerMBean.unregister();
-            shutdownHouseKeeping();
+            CurrentActor.get().message(VirtualHostMessages.ERRORED());
         }
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Fri Aug  3 12:13:32 2012
@@ -22,10 +22,12 @@ package org.apache.qpid.server.virtualho
 
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -37,6 +39,8 @@ public class VirtualHostRegistry impleme
 
     private String _defaultVirtualHostName;
     private ApplicationRegistry _applicationRegistry;
+    private final Collection<RegistryChangeListener> _listeners =
+            Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
 
     public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
     {
@@ -50,11 +54,25 @@ public class VirtualHostRegistry impleme
             throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
         }
         _registry.put(host.getName(),host);
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
+            {
+                listener.virtualHostRegistered(host);
+            }
+        }
     }
     
     public synchronized void unregisterVirtualHost(VirtualHost host)
     {
         _registry.remove(host.getName());
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
+            {
+                listener.virtualHostUnregistered(host);
+            }
+        }
     }
 
     public VirtualHost getVirtualHost(String name)
@@ -106,4 +124,17 @@ public class VirtualHostRegistry impleme
         }
 
     }
+
+    public static interface RegistryChangeListener
+    {
+        void virtualHostRegistered(VirtualHost virtualHost);
+        void virtualHostUnregistered(VirtualHost virtualHost);
+
+    }
+
+    public void addRegistryChangeListener(RegistryChangeListener listener)
+    {
+        _listeners.add(listener);
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java Fri Aug  3 12:13:32 2012
@@ -1,5 +1,6 @@
 /*
  *
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file

Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java Fri Aug  3 12:13:32 2012
@@ -199,4 +199,31 @@ public class BrokerOptionsTest extends Q
         _options.setLogWatchFrequency(myFreq);
         assertEquals(myFreq, _options.getLogWatchFrequency());
     }
+
+    public void testDefaultIncludesPortFor0_10()
+    {
+        assertEquals(Collections.EMPTY_SET, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+    }
+
+    public void testOverriddenIncludesPortFor0_10()
+    {
+        _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+        assertEquals(Collections.singleton(TEST_PORT1), _options.getIncludedPorts(ProtocolInclusion.v0_10));
+    }
+
+    public void testManyOverriddenIncludedPortFor0_10()
+    {
+        _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+        _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT2);
+        final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2}));
+        assertEquals(expectedPorts, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+    }
+
+    public void testDuplicatedOverriddenIncludedPortFor0_10AreSilentlyIgnored()
+    {
+        _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+        _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT2);
+        final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2}));
+        assertEquals(expectedPorts, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java Fri Aug  3 12:13:32 2012
@@ -47,6 +47,11 @@ public class MainTest extends QpidTestCa
         {
             assertEquals(0, options.getExcludedPorts(pe).size());
         }
+
+        for(ProtocolInclusion pe : EnumSet.allOf(ProtocolInclusion.class))
+        {
+            assertEquals(0, options.getIncludedPorts(pe).size());
+        }
     }
 
     public void testPortOverriddenSingle()
@@ -162,6 +167,20 @@ public class MainTest extends QpidTestCa
         assertTrue("Parsed command line didnt pick up help option", main.getCommandLine().hasOption("h"));
     }
 
+    public void testInclude010()
+    {
+        BrokerOptions options = startDummyMain("-p 5678 --include-0-10 5678");
+
+        assertTrue(options.getPorts().contains(5678));
+        assertEquals(1, options.getPorts().size());
+        assertTrue(options.getIncludedPorts(ProtocolInclusion.v0_10).contains(5678));
+        assertEquals(1, options.getIncludedPorts(ProtocolInclusion.v0_10).size());
+        assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_9_1).size());
+        assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_9).size());
+        assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_8).size());
+        assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v1_0).size());
+    }
+
     private BrokerOptions startDummyMain(String commandLine)
     {
         return (new TestMain(commandLine.split("\\s"))).getOptions();

Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java Fri Aug  3 12:13:32 2012
@@ -25,14 +25,14 @@ import java.util.UUID;
 public class MockConnectionConfig implements ConnectionConfig
 {
 
-    public MockConnectionConfig(UUID _id, ConnectionConfigType _configType,
+    public MockConnectionConfig(UUID _qmfId, ConnectionConfigType _configType,
                     ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent, boolean _durable,
                     long _createTime, VirtualHostConfig _virtualHost, String _address, Boolean _incoming,
                     Boolean _systemConnection, Boolean _federationLink, String _authId, String _remoteProcessName,
                     Integer _remotePID, Integer _remoteParentPID, ConfigStore _configStore, Boolean _shadow)
     {
         super();
-        this._id = _id;
+        this._qmfId = _qmfId;
         this._configType = _configType;
         this._parent = _parent;
         this._durable = _durable;
@@ -50,7 +50,7 @@ public class MockConnectionConfig implem
         this._shadow = _shadow;
     }
 
-    private UUID _id;
+    private UUID _qmfId;
     private ConnectionConfigType _configType;
     private ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent;
     private boolean _durable;
@@ -68,9 +68,9 @@ public class MockConnectionConfig implem
     private Boolean _shadow;
 
     @Override
-    public UUID getId()
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     @Override

Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Fri Aug  3 12:13:32 2012
@@ -176,20 +176,29 @@ public class QueueConfigurationTest exte
         assertEquals(1, qConf.getMaximumMessageCount());
     }
 
-    public void testGetMinimumAlertRepeatGap() throws ConfigurationException
+    public void testGetMinimumAlertRepeatGap() throws Exception
     {
-        // Check default value
-        QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
-        assertEquals(0, qConf.getMinimumAlertRepeatGap());
-
-        // Check explicit value
-        VirtualHostConfiguration vhostConfig = overrideConfiguration("minimumAlertRepeatGap", 2);
-        qConf = new QueueConfiguration("test", vhostConfig);
-        assertEquals(2, qConf.getMinimumAlertRepeatGap());
-
-        // Check inherited value
-        qConf = new QueueConfiguration("test", _fullHostConf);
-        assertEquals(1, qConf.getMinimumAlertRepeatGap());
+        try
+        {
+            ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+            ApplicationRegistry.initialise(registry);
+            // Check default value
+            QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+            assertEquals(ServerConfiguration.DEFAULT_MINIMUM_ALERT_REPEAT_GAP, qConf.getMinimumAlertRepeatGap());
+
+            // Check explicit value
+            VirtualHostConfiguration vhostConfig = overrideConfiguration("minimumAlertRepeatGap", 2);
+            qConf = new QueueConfiguration("test", vhostConfig);
+            assertEquals(2, qConf.getMinimumAlertRepeatGap());
+
+            // Check inherited value
+            qConf = new QueueConfiguration("test", _fullHostConf);
+            assertEquals(1, qConf.getMinimumAlertRepeatGap());
+        }
+        finally
+        {
+            ApplicationRegistry.remove();
+        }
     }
 
     public void testSortQueueConfiguration() throws ConfigurationException
@@ -204,6 +213,18 @@ public class QueueConfigurationTest exte
         assertEquals("test-sort-key", qConf.getQueueSortKey());
     }
 
+    public void testQueueDescription() throws ConfigurationException
+    {
+        //Check default value
+        QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+        assertNull(qConf.getDescription());
+
+        // Check explicit value
+        final VirtualHostConfiguration vhostConfig = overrideConfiguration("description", "mydescription");
+        qConf = new QueueConfiguration("test", vhostConfig);
+        assertEquals("mydescription", qConf.getDescription());
+    }
+
     private VirtualHostConfiguration overrideConfiguration(String property, Object value)
             throws ConfigurationException
     {

Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Fri Aug  3 12:13:32 2012
@@ -25,6 +25,7 @@ import org.apache.commons.configuration.
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
 import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -251,13 +252,13 @@ public class ServerConfigurationTest ext
     {
         // Check default
         _serverConfig.initialise();
-        assertEquals(true, _serverConfig.getManagementSSLEnabled());
+        assertEquals(false, _serverConfig.getManagementSSLEnabled());
 
         // Check value we set
-        _config.setProperty("management.ssl.enabled", false);
+        _config.setProperty("management.ssl.enabled", true);
         _serverConfig = new ServerConfiguration(_config);
         _serverConfig.initialise();
-        assertEquals(false, _serverConfig.getManagementSSLEnabled());
+        assertEquals(true, _serverConfig.getManagementSSLEnabled());
     }
 
     public void testGetManagementKeystorePassword() throws ConfigurationException
@@ -286,25 +287,17 @@ public class ServerConfigurationTest ext
         assertEquals(false, _serverConfig.getQueueAutoRegister());
     }
 
-    public void testGetManagementEnabled() throws ConfigurationException
+    public void testGetJMXManagementEnabled() throws ConfigurationException
     {
         // Check default
         _serverConfig.initialise();
-        assertEquals(true, _serverConfig.getManagementEnabled());
+        assertEquals(true, _serverConfig.getJMXManagementEnabled());
 
         // Check value we set
         _config.setProperty("management.enabled", false);
         _serverConfig = new ServerConfiguration(_config);
         _serverConfig.initialise();
-        assertEquals(false, _serverConfig.getManagementEnabled());
-    }
-
-    public void testSetManagementEnabled() throws ConfigurationException
-    {
-        // Check value we set
-        _serverConfig.initialise();
-        _serverConfig.setManagementEnabled(false);
-        assertEquals(false, _serverConfig.getManagementEnabled());
+        assertEquals(false, _serverConfig.getJMXManagementEnabled());
     }
 
     public void testGetManagementRightsInferAllAccess() throws Exception
@@ -401,7 +394,7 @@ public class ServerConfigurationTest ext
     {
         // Check default
         _serverConfig.initialise();
-        assertEquals(0, _serverConfig.getMinimumAlertRepeatGap());
+        assertEquals(30000l, _serverConfig.getMinimumAlertRepeatGap());
 
         // Check value we set
         _config.setProperty("minimumAlertRepeatGap", 10L);
@@ -1588,6 +1581,168 @@ public class ServerConfigurationTest ext
         assertEquals(false, _serverConfig.isAmqp08enabled());
     }
 
+    public void testPortInclude08() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertEquals(true, _serverConfig.getPortInclude08().isEmpty());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_08, "1");
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_08, "2");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(2, _serverConfig.getPortInclude08().size());
+        assertTrue(_serverConfig.getPortInclude08().contains("1"));
+        assertTrue(_serverConfig.getPortInclude08().contains("2"));
+    }
+
+    public void testPortInclude09() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertEquals(true, _serverConfig.getPortInclude09().isEmpty());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_09, "3");
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_09, "4");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(2, _serverConfig.getPortInclude09().size());
+        assertTrue(_serverConfig.getPortInclude09().contains("3"));
+        assertTrue(_serverConfig.getPortInclude09().contains("4"));
+    }
+
+    public void testPortInclude091() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertEquals(true, _serverConfig.getPortInclude091().isEmpty());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_091, "5");
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_091, "6");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(2, _serverConfig.getPortInclude091().size());
+        assertTrue(_serverConfig.getPortInclude091().contains("5"));
+        assertTrue(_serverConfig.getPortInclude091().contains("6"));
+    }
+
+    public void testPortInclude010() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertEquals(true, _serverConfig.getPortInclude010().isEmpty());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, "7");
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, "8");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(2, _serverConfig.getPortInclude010().size());
+        assertTrue(_serverConfig.getPortInclude010().contains("7"));
+        assertTrue(_serverConfig.getPortInclude010().contains("8"));
+    }
+
+    public void testPortInclude10() throws ConfigurationException
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertEquals(true, _serverConfig.getPortInclude10().isEmpty());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_10, "9");
+        _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_10, "10");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(2, _serverConfig.getPortInclude10().size());
+        assertTrue(_serverConfig.getPortInclude10().contains("9"));
+        assertTrue(_serverConfig.getPortInclude10().contains("10"));
+    }
+
+    public void testGetDefaultSupportedProtocolReply() throws Exception
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertNull("unexpected default value", _serverConfig.getDefaultSupportedProtocolReply());
+
+        // Check values we set
+        _config.addProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_10");
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(AmqpProtocolVersion.v0_10, _serverConfig.getDefaultSupportedProtocolReply());
+    }
+
+    public void testDefaultAuthenticationManager() throws Exception
+    {
+        // Check default
+        _serverConfig.initialise();
+        assertNull("unexpected default value", _serverConfig.getDefaultAuthenticationManager());
+
+        // Check values we set
+        String testAuthManager = "myauthmanager";
+        _config.addProperty("security.default-auth-manager", testAuthManager);
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals(testAuthManager, _serverConfig.getDefaultAuthenticationManager());
+    }
+
+    public void testPortAuthenticationMappingsDefault() throws Exception
+    {
+        _serverConfig.initialise();
+        assertEquals("unexpected default number of port/authmanager mappings", 0, _serverConfig.getPortAuthenticationMappings().size());
+    }
+
+    public void testPortAuthenticationMappingsWithSingleMapping() throws Exception
+    {
+        String testAuthManager = "myauthmanager";
+        _config.addProperty("security.port-mappings.port-mapping.port", 1234);
+        _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager);
+
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+        assertEquals("unexpected number of port/authmanager mappings", 1, _serverConfig.getPortAuthenticationMappings().size());
+        assertEquals("unexpected mapping for port", testAuthManager, _serverConfig.getPortAuthenticationMappings().get(1234));
+    }
+
+    public void testPortAuthenticationMappingsWithManyMapping() throws Exception
+    {
+        String testAuthManager1 = "myauthmanager1";
+        String testAuthManager2 = "myauthmanager2";
+        _config.addProperty("security.port-mappings.port-mapping(-1).port", 1234);
+        _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager1);
+
+        _config.addProperty("security.port-mappings.port-mapping(-1).port", 2345);
+        _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager2);
+
+        _serverConfig = new ServerConfiguration(_config);
+        _serverConfig.initialise();
+
+        assertEquals("unexpected number of port/authmanager mappings", 2, _serverConfig.getPortAuthenticationMappings().size());
+        assertEquals("unexpected mapping for port", testAuthManager1, _serverConfig.getPortAuthenticationMappings().get(1234));
+        assertEquals("unexpected mapping for port", testAuthManager2, _serverConfig.getPortAuthenticationMappings().get(2345));
+    }
+
+    public void testPortAuthenticationMappingWithMissingAuthManager() throws Exception
+    {
+        _config.addProperty("security.port-mappings.port-mapping(-1).port", 1234);
+        // no auth manager defined for port
+        _serverConfig = new ServerConfiguration(_config);
+        try
+        {
+            _serverConfig.initialise();
+            fail("Exception not thrown");
+        }
+        catch(ConfigurationException ce)
+        {
+            // PASS
+            assertEquals("Incorrect error message",
+                    "Validation error: Each port-mapping must have exactly one port and exactly one auth-manager.",
+                    ce.getMessage());
+        }
+    }
+
     /**
      * Convenience method to output required security preamble for broker config
      */
@@ -1605,7 +1760,6 @@ public class ServerConfigurationTest ext
         out.write("\t\t\t\t\t</attribute>\n");
         out.write("\t\t\t\t</attributes>\n");
         out.write("\t\t\t</principal-database>\n");
-        out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
         out.write("\t\t</pd-auth-manager>\n");
         out.write("\t</security>\n");
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org