You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [13/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Fri Aug  3 12:13:32 2012
@@ -70,6 +70,7 @@ public class ManagementExchange implemen
 
     private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
     private UUID _id;
+    private UUID _qmfId;
     private static final String AGENT_BANK = "0";
 
     private int _bindingCountHigh;
@@ -84,7 +85,7 @@ public class ManagementExchange implemen
 
     private class ManagementQueue implements BaseQueue
     {
-        private final UUID QUEUE_ID =  UUIDGenerator.generateUUID();
+        private final UUID QUEUE_ID =  UUIDGenerator.generateRandomUUID();
         private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString();
         private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
 
@@ -196,6 +197,7 @@ public class ManagementExchange implemen
         _virtualHost = host;
         _id = id;
         _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
+        _qmfId = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
         getQMFService().addListener(this);
     }
@@ -205,6 +207,12 @@ public class ManagementExchange implemen
         return _id;
     }
 
+    @Override
+    public UUID getQMFId()
+    {
+        return _qmfId;
+    }
+
     public ExchangeConfigType getConfigType()
     {
         return ExchangeConfigType.getInstance();
@@ -540,6 +548,11 @@ public class ManagementExchange implemen
         return getMsgReceives();
     }
 
+    public long getMsgDrops()
+    {
+        return 0l;
+    }
+
     public long getByteReceives()
     {
         return _bytesReceived.get();
@@ -550,6 +563,11 @@ public class ManagementExchange implemen
         return getByteReceives();
     }
 
+    public long getByteDrops()
+    {
+        return 0l;
+    }
+
     public long getCreateTime()
     {
         return _createTime;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Fri Aug  3 12:13:32 2012
@@ -21,6 +21,8 @@
 
 package org.apache.qpid.qmf;
 
+import java.util.Collection;
+import java.util.Collections;
 import org.apache.commons.lang.NotImplementedException;
 
 import org.apache.qpid.framing.AMQShortString;
@@ -111,6 +113,16 @@ public class QMFMessage implements Serve
         return 0;
     }
 
+    public String getUserId()
+    {
+        return null;
+    }
+
+    public String getAppId()
+    {
+        return null;
+    }
+
     public String getMessageId()
     {
         return null;
@@ -166,6 +178,12 @@ public class QMFMessage implements Serve
         return false;
     }
 
+    @Override
+    public Collection<String> getHeaderNames()
+    {
+        return Collections.EMPTY_SET;
+    }
+
     public boolean containsHeader(String name)
     {
         return false;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java Fri Aug  3 12:13:32 2012
@@ -28,7 +28,7 @@ public abstract class QMFObject<C extend
 
     public interface Delegate
     {
-        UUID getId();
+        UUID getQMFId();
         long getCreateTime();
     }
 
@@ -49,7 +49,7 @@ public abstract class QMFObject<C extend
 
     public final UUID getId()
     {
-        return _delegate.getId();
+        return _delegate.getQMFId();
     }
 
     public final long getCreateTime()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Aug  3 12:13:32 2012
@@ -436,7 +436,7 @@ public class QMFService implements Confi
             QMFObject qmfObject = classObjects.remove(object);
             if(qmfObject != null)
             {
-                _managedObjectsById.get(qmfClass).remove(object.getId());
+                _managedObjectsById.get(qmfClass).remove(object.getQMFId());
                 objectRemoved(qmfObject);
             }
         }
@@ -468,7 +468,7 @@ public class QMFService implements Confi
             }
         }
 
-        classObjectsById.put(object.getId(),qmfObject);
+        classObjectsById.put(object.getQMFId(),qmfObject);
 
         if(classObjects.putIfAbsent(object, qmfObject) == null)
         {
@@ -570,7 +570,7 @@ public class QMFService implements Confi
 
         public UUID getSystemId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public String getOsName()
@@ -598,9 +598,9 @@ public class QMFService implements Confi
             return _obj.getOSArchitecture();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -964,9 +964,9 @@ public class QMFService implements Confi
             return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1004,9 +1004,9 @@ public class QMFService implements Confi
             return _obj.getFederationTag();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1135,9 +1135,9 @@ public class QMFService implements Confi
             return _obj.getByteRoutes();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1470,9 +1470,9 @@ public class QMFService implements Confi
             return _obj.getArguments();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1526,9 +1526,9 @@ public class QMFService implements Confi
             return _obj.getMatches();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1647,9 +1647,9 @@ public class QMFService implements Confi
             return factory.createResponseCommand();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1741,6 +1741,12 @@ public class QMFService implements Confi
             return 0l;
         }
 
+        public Long getUnackedMessages()
+        {
+            // TODO
+            return 0l;
+        }
+
         public Long getTxnStarts()
         {
             return _obj.getTxnStarts();
@@ -1799,9 +1805,9 @@ public class QMFService implements Confi
             return factory.createResponseCommand();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1870,9 +1876,9 @@ public class QMFService implements Confi
             return _obj.getDelivered();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -1955,14 +1961,20 @@ public class QMFService implements Confi
             return _obj.getAckBatching();
         }
 
+        /* support TBD */
+        public String getName()
+        {
+            return null;
+        }
+
         public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
         {
             return null;
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()
@@ -2020,6 +2032,18 @@ public class QMFService implements Confi
             return _obj.getLastError();
         }
 
+        /* support TBD */
+        public String getName()
+        {
+            return null;
+        }
+
+        /* support TBD */
+        public BrokerSchema.ConnectionObject getConnectionRef()
+        {
+            return (BrokerSchema.ConnectionObject) null;
+        }
+
         public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
         {
             _obj.close();
@@ -2042,9 +2066,9 @@ public class QMFService implements Confi
             return factory.createResponseCommand();
         }
 
-        public UUID getId()
+        public UUID getQMFId()
         {
-            return _obj.getId();
+            return _obj.getQMFId();
         }
 
         public long getCreateTime()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Aug  3 12:13:32 2012
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -32,9 +34,9 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -89,7 +91,6 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.subscription.RecordDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -137,11 +138,9 @@ public class AMQChannel implements Sessi
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
 
-    private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
-
     private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
 
-    // Set of messages being acknoweledged in the current transaction
+    // Set of messages being acknowledged in the current transaction
     private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
 
     private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -157,7 +156,7 @@ public class AMQChannel implements Sessi
     private final AMQProtocolSession _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
-    private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+    private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
 
@@ -170,11 +169,13 @@ public class AMQChannel implements Sessi
     private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
-    private final UUID _id;
+    private final UUID _qmfId;
     private long _createTime = System.currentTimeMillis();
 
     private final ClientDeliveryMethod _clientDeliveryMethod;
 
+    private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
     {
@@ -183,7 +184,7 @@ public class AMQChannel implements Sessi
 
         _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
         _logSubject = new ChannelLogSubject(this);
-        _id = getConfigStore().createId();
+        _qmfId = getConfigStore().createId();
         _actor.message(ChannelMessages.CREATE());
 
         getConfigStore().addConfiguredObject(this);
@@ -194,6 +195,8 @@ public class AMQChannel implements Sessi
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
          _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+
+         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
     }
 
     public ConfigStore getConfigStore()
@@ -264,6 +267,11 @@ public class AMQChannel implements Sessi
         return _txnCount.get();
     }
 
+    public Long getTxnStart()
+    {
+        return _txnStarts.get();
+    }
+
     public int getChannelId()
     {
         return _channelId;
@@ -441,7 +449,7 @@ public class AMQChannel implements Sessi
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
      *
-     * @param noLocal   Flag stopping own messages being receivied.
+     * @param noLocal   Flag stopping own messages being received.
      * @param exclusive Flag requesting exclusive access to the queue
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
      *
@@ -948,9 +956,11 @@ public class AMQChannel implements Sessi
 
     public void commit() throws AMQException
     {
-        commit(null);
+        commit(null, false);
     }
-    public void commit(Runnable immediateAction) throws AMQException
+
+
+    public void commit(final Runnable immediateAction, boolean async) throws AMQException
     {
 
         if (!isTransactional())
@@ -958,11 +968,29 @@ public class AMQChannel implements Sessi
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
 
-        _transaction.commit(immediateAction);
+        if(async && _transaction instanceof LocalTransaction)
+        {
+
+            ((LocalTransaction)_transaction).commitAsync(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    immediateAction.run();
+                    _txnCommits.incrementAndGet();
+                    _txnStarts.incrementAndGet();
+                    decrementOutstandingTxnsIfNecessary();
+                }
+            });
+        }
+        else
+        {
+            _transaction.commit(immediateAction);
 
-        _txnCommits.incrementAndGet();
-        _txnStarts.incrementAndGet();
-        decrementOutstandingTxnsIfNecessary();
+            _txnCommits.incrementAndGet();
+            _txnStarts.incrementAndGet();
+            decrementOutstandingTxnsIfNecessary();
+        }
     }
 
     public void rollback() throws AMQException
@@ -1357,9 +1385,34 @@ public class AMQChannel implements Sessi
         return _actor;
     }
 
-    public void block(AMQQueue queue)
+    public synchronized void block()
     {
-        if(_blockingQueues.add(queue))
+        if(_blockingEntities.add(this))
+        {
+            if(_blocking.compareAndSet(false,true))
+            {
+                _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+                flow(false);
+            }
+        }
+    }
+
+    public synchronized void unblock()
+    {
+        if(_blockingEntities.remove(this))
+        {
+            if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
+            {
+                _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+
+                flow(true);
+            }
+        }
+    }
+
+    public synchronized void block(AMQQueue queue)
+    {
+        if(_blockingEntities.add(queue))
         {
 
             if(_blocking.compareAndSet(false,true))
@@ -1370,11 +1423,11 @@ public class AMQChannel implements Sessi
         }
     }
 
-    public void unblock(AMQQueue queue)
+    public synchronized void unblock(AMQQueue queue)
     {
-        if(_blockingQueues.remove(queue))
+        if(_blockingEntities.remove(queue))
         {
-            if(_blocking.compareAndSet(true,false) && !isClosing())
+            if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
             {
                 _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
 
@@ -1393,6 +1446,11 @@ public class AMQChannel implements Sessi
         return false;
     }
 
+    public int getUnacknowledgedMessageCount()
+    {
+        return getUnacknowledgedMessageMap().size();
+    }
+
     private void flow(boolean flow)
     {
         MethodRegistry methodRegistry = _session.getMethodRegistry();
@@ -1400,6 +1458,7 @@ public class AMQChannel implements Sessi
         _session.writeFrame(responseBody.generateFrame(_channelId));
     }
 
+    @Override
     public boolean getBlocking()
     {
         return _blocking.get();
@@ -1456,9 +1515,10 @@ public class AMQChannel implements Sessi
         return false;
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public String getSessionName()
@@ -1484,30 +1544,42 @@ public class AMQChannel implements Sessi
             long openTime = currentTime - _transaction.getTransactionStartTime();
             long idleTime = currentTime - _txnUpdateTime.get();
 
-            // Log a warning on idle or open transactions
-            if (idleWarn > 0L && idleTime > idleWarn)
-            {
-                CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
-                _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
-            }
-            else if (openWarn > 0L && openTime > openWarn)
+            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
             {
-                CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
-                _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
+                closeConnection("Idle transaction timed out");
+                return;
             }
 
-            // Close connection for idle or open transactions that have timed out
-            if (idleClose > 0L && idleTime > idleClose)
+            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
             {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
-            }
-            else if (openClose > 0L && openTime > openClose)
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+                closeConnection("Open transaction timed out");
+                return;
             }
         }
     }
 
+    /**
+     * Typically called from the HouseKeepingThread instead of the main receiver thread,
+     * therefore uses a lock to close the connection in a thread-safe manner.
+     */
+    private void closeConnection(String reason) throws AMQException
+    {
+        Lock receivedLock = _session.getReceivedLock();
+        receivedLock.lock();
+        try
+        {
+            _session.close(AMQConstant.RESOURCE_ERROR, reason);
+        }
+        finally
+        {
+            receivedLock.unlock();
+        }
+    }
+
     public void deadLetter(long deliveryTag) throws AMQException
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
@@ -1563,23 +1635,6 @@ public class AMQChannel implements Sessi
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
 
-    public void completeAsyncCommands()
-    {
-        AsyncCommand cmd;
-        while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
-        {
-            cmd.complete();
-            _unfinishedCommandsQueue.poll();
-        }
-        while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
-        {
-            cmd = _unfinishedCommandsQueue.poll();
-            cmd.awaitReadyForCompletion();
-            cmd.complete();
-        }
-    }
-
-
     public void sync()
     {
         AsyncCommand cmd;
@@ -1588,6 +1643,10 @@ public class AMQChannel implements Sessi
             cmd.awaitReadyForCompletion();
             cmd.complete();
         }
+        if(_transaction instanceof LocalTransaction)
+        {
+            ((LocalTransaction)_transaction).sync();
+        }
     }
 
     private static class AsyncCommand
@@ -1624,6 +1683,12 @@ public class AMQChannel implements Sessi
 
     public int compareTo(AMQSessionModel session)
     {
-        return getId().compareTo(session.getId());
+        return getQMFId().compareTo(session.getQMFId());
+    }
+
+    @Override
+    public int getConsumerCount()
+    {
+        return _tag2SubscriptionMap.size();
     }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java Fri Aug  3 12:13:32 2012
@@ -20,19 +20,22 @@
  */
 package org.apache.qpid.server;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.*;
+import javax.net.ssl.SSLContext;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-
 import org.apache.qpid.server.configuration.ServerConfiguration;
 import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.log4j.LoggingFacade;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
 import org.apache.qpid.server.protocol.AmqpProtocolVersion;
 import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
@@ -46,30 +49,10 @@ import org.apache.qpid.transport.network
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.Formatter;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.FileHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-
 public class Broker
 {
     private static final Logger LOGGER = Logger.getLogger(Broker.class);
 
-    private static final int IPV4_ADDRESS_LENGTH = 4;
-    private static final char IPV4_LITERAL_SEPARATOR = '.';
     private volatile Thread _shutdownHookThread;
 
     protected static class InitException extends RuntimeException
@@ -128,6 +111,14 @@ public class Broker
 
         ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
         ServerConfiguration serverConfig = config.getConfiguration();
+        if (options.getQpidWork() != null)
+        {
+            serverConfig.setQpidWork(options.getQpidWork());
+        }
+        if (options.getQpidHome() != null)
+        {
+            serverConfig.setQpidHome(options.getQpidHome());
+        }
         updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
 
         ApplicationRegistry.initialise(config);
@@ -145,14 +136,6 @@ public class Broker
 
         try
         {
-            configureLoggingManagementMBean(logConfigFile, options.getLogWatchFrequency());
-
-            ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
-            configMBean.register();
-
-            ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config);
-            sysInfoMBean.register();
-
             Set<Integer> ports = new HashSet<Integer>(options.getPorts());
             if(ports.isEmpty())
             {
@@ -165,36 +148,71 @@ public class Broker
                 parsePortList(sslPorts, serverConfig.getSSLPorts());
             }
 
+            //1-0 excludes and includes
             Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
             if(exclude_1_0.isEmpty())
             {
                 parsePortList(exclude_1_0, serverConfig.getPortExclude10());
             }
 
+            Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
+            if(include_1_0.isEmpty())
+            {
+                parsePortList(include_1_0, serverConfig.getPortInclude10());
+            }
+
+            //0-10 excludes and includes
             Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
             if(exclude_0_10.isEmpty())
             {
                 parsePortList(exclude_0_10, serverConfig.getPortExclude010());
             }
 
+            Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
+            if(include_0_10.isEmpty())
+            {
+                parsePortList(include_0_10, serverConfig.getPortInclude010());
+            }
+
+            //0-9-1 excludes and includes
             Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
             if(exclude_0_9_1.isEmpty())
             {
                 parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
             }
 
+            Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
+            if(include_0_9_1.isEmpty())
+            {
+                parsePortList(include_0_9_1, serverConfig.getPortInclude091());
+            }
+
+            //0-9 excludes and includes
             Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
             if(exclude_0_9.isEmpty())
             {
                 parsePortList(exclude_0_9, serverConfig.getPortExclude09());
             }
 
+            Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
+            if(include_0_9.isEmpty())
+            {
+                parsePortList(include_0_9, serverConfig.getPortInclude09());
+            }
+
+            //0-8 excludes and includes
             Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
             if(exclude_0_8.isEmpty())
             {
                 parsePortList(exclude_0_8, serverConfig.getPortExclude08());
             }
 
+            Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
+            if(include_0_8.isEmpty())
+            {
+                parsePortList(include_0_8, serverConfig.getPortInclude08());
+            }
+
             String bindAddr = options.getBind();
             if (bindAddr == null)
             {
@@ -220,8 +238,8 @@ public class Broker
                     final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
 
                     final Set<AmqpProtocolVersion> supported =
-                                    getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9,
-                                                         exclude_0_8, serverConfig);
+                                    getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+                                                         include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
 
                     final NetworkTransportConfiguration settings =
                                     new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -233,7 +251,7 @@ public class Broker
                     transport.accept(settings, protocolEngineFactory, null);
 
                     ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
-                                    new QpidAcceptor(transport,"TCP"));
+                                    new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
                     CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
                 }
             }
@@ -242,16 +260,31 @@ public class Broker
             {
                 final String keystorePath = serverConfig.getConnectorKeyStorePath();
                 final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+                final String keystoreType = serverConfig.getConnectorKeyStoreType();
                 final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
-                final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keyManagerFactoryAlgorithm);
+                final SSLContext sslContext;
+                if(serverConfig.getConnectorTrustStorePath()!=null)
+                {
+                    sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
+                                                                      serverConfig.getConnectorTrustStorePassword(),
+                                                                      serverConfig.getConnectorTrustStoreType(),
+                                                                      serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
+                                                                      keystorePath,
+                                                                      keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
+                                                                      serverConfig.getCertAlias());
+                }
+                else
+                {
+                    sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
+                }
 
                 for(int sslPort : sslPorts)
                 {
                     final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
 
                     final Set<AmqpProtocolVersion> supported =
-                                    getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1,
-                                                         exclude_0_9, exclude_0_8, serverConfig);
+                                    getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+                                                         include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
                     final NetworkTransportConfiguration settings =
                         new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
 
@@ -262,7 +295,7 @@ public class Broker
                     transport.accept(settings, protocolEngineFactory, sslContext);
 
                     ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
-                            new QpidAcceptor(transport,"TCP"));
+                            new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
                     CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
                 }
             }
@@ -282,27 +315,36 @@ public class Broker
                                                                  final Set<Integer> exclude_0_9_1,
                                                                  final Set<Integer> exclude_0_9,
                                                                  final Set<Integer> exclude_0_8,
+                                                                 final Set<Integer> include_1_0,
+                                                                 final Set<Integer> include_0_10,
+                                                                 final Set<Integer> include_0_9_1,
+                                                                 final Set<Integer> include_0_9,
+                                                                 final Set<Integer> include_0_8,
                                                                  final ServerConfiguration serverConfig)
     {
         final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
 
-        if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled())
+        if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
         {
             supported.remove(AmqpProtocolVersion.v1_0_0);
         }
-        if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled())
+
+        if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
         {
             supported.remove(AmqpProtocolVersion.v0_10);
         }
-        if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled())
+
+        if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
         {
             supported.remove(AmqpProtocolVersion.v0_9_1);
         }
-        if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled())
+
+        if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
         {
             supported.remove(AmqpProtocolVersion.v0_9);
         }
-        if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled())
+
+        if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
         {
             supported.remove(AmqpProtocolVersion.v0_8);
         }
@@ -388,7 +430,7 @@ public class Broker
         }
     }
 
-    private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException
+    private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
     {
         if (logConfigFile.exists() && logConfigFile.canRead())
         {
@@ -401,7 +443,7 @@ public class Broker
                 // log4j expects the watch interval in milliseconds
                 try
                 {
-                    QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+                    LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
                 }
                 catch (Exception e)
                 {
@@ -412,7 +454,7 @@ public class Broker
             {
                 try
                 {
-                    QpidLog4JConfigurator.configure(logConfigFile.getPath());
+                    LoggingFacade.configure(logConfigFile.getPath());
                 }
                 catch (Exception e)
                 {
@@ -446,12 +488,6 @@ public class Broker
         }
     }
 
-    private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
-    {
-        LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
-        blm.register();
-    }
 
     private void addShutdownHook()
     {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java Fri Aug  3 12:13:32 2012
@@ -33,10 +33,12 @@ public class BrokerOptions
     public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
     public static final String DEFAULT_LOG_CONFIG_FILE = "etc/log4j.xml";
     public static final String QPID_HOME = "QPID_HOME";
+    public static final String QPID_WORK = "QPID_WORK";
 
     private final Set<Integer> _ports = new HashSet<Integer>();
     private final Set<Integer> _sslPorts = new HashSet<Integer>();
     private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>();
+    private final Map<ProtocolInclusion,Set<Integer>> _inclusionMap = new HashMap<ProtocolInclusion, Set<Integer>>();
 
     private String _configFile;
     private String _logConfigFile;
@@ -46,6 +48,8 @@ public class BrokerOptions
     private BundleContext _bundleContext;
 
     private Integer _logWatchFrequency = 0;
+    private String _qpidWorkFolder;
+    private String _qpidHomeFolder;
 
     public void addPort(final int port)
     {
@@ -108,7 +112,7 @@ public class BrokerOptions
     }
     public String getQpidHome()
     {
-        return System.getProperty(QPID_HOME);
+        return _qpidHomeFolder == null? System.getProperty(QPID_HOME): _qpidHomeFolder;
     }
 
     public Set<Integer> getExcludedPorts(final ProtocolExclusion excludeProtocol)
@@ -161,4 +165,36 @@ public class BrokerOptions
     {
         _bundleContext = bundleContext;
     }
+
+    public Set<Integer> getIncludedPorts(final ProtocolInclusion includeProtocol)
+    {
+        final Set<Integer> includedPorts = _inclusionMap.get(includeProtocol);
+        return includedPorts == null ? Collections.<Integer>emptySet() : includedPorts;
+    }
+
+    public void addIncludedPort(final ProtocolInclusion includeProtocol, final int port)
+    {
+        if (!_inclusionMap.containsKey(includeProtocol))
+        {
+            _inclusionMap.put(includeProtocol, new HashSet<Integer>());
+        }
+
+        Set<Integer> ports = _inclusionMap.get(includeProtocol);
+        ports.add(port);
+    }
+
+    public String getQpidWork()
+    {
+        return _qpidWorkFolder;
+    }
+
+    public void setQpidWork(String qpidWorkFolder)
+    {
+        _qpidWorkFolder = qpidWorkFolder;
+    }
+
+    public void setQpidHome(String qpidHomeFolder)
+    {
+        _qpidHomeFolder = qpidHomeFolder;
+    }
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Aug  3 12:13:32 2012
@@ -85,6 +85,32 @@ public class Main
                     .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
                     .withLongOpt("exclude-0-8").create();
 
+    private static final Option OPTION_INCLUDE_1_0 =
+        OptionBuilder.withArgName("port").hasArg()
+                .withDescription("accept AMQP1-0 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+                     .withLongOpt("include-1-0").create();
+
+private static final Option OPTION_INCLUDE_0_10 =
+        OptionBuilder.withArgName("port").hasArg()
+                .withDescription("accept AMQP0-10 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+                .withLongOpt("include-0-10").create();
+
+private static final Option OPTION_INCLUDE_0_9_1 =
+        OptionBuilder.withArgName("port").hasArg()
+                .withDescription("accept AMQP0-9-1 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+                .withLongOpt("include-0-9-1").create();
+
+private static final Option OPTION_INCLUDE_0_9 =
+        OptionBuilder.withArgName("port").hasArg()
+                .withDescription("accept AMQP0-9 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+                .withLongOpt("include-0-9").create();
+
+private static final Option OPTION_INCLUDE_0_8 =
+        OptionBuilder.withArgName("port").hasArg()
+                .withDescription("accept AMQP0-8 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+                .withLongOpt("include-0-8").create();
+
+
     private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
             OptionBuilder.withArgName("port").hasArg()
                     .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
@@ -127,6 +153,11 @@ public class Main
         OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
         OPTIONS.addOption(OPTION_EXCLUDE_0_9);
         OPTIONS.addOption(OPTION_EXCLUDE_0_8);
+        OPTIONS.addOption(OPTION_INCLUDE_1_0);
+        OPTIONS.addOption(OPTION_INCLUDE_0_10);
+        OPTIONS.addOption(OPTION_INCLUDE_0_9_1);
+        OPTIONS.addOption(OPTION_INCLUDE_0_9);
+        OPTIONS.addOption(OPTION_INCLUDE_0_8);
         OPTIONS.addOption(OPTION_BIND);
 
         OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
@@ -256,6 +287,10 @@ public class Main
                 {
                     parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
                 }
+                for(ProtocolInclusion pe : ProtocolInclusion.values())
+                {
+                    parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+                }
             }
 
             String[] sslPortStr = _commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
@@ -266,6 +301,10 @@ public class Main
                 {
                     parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
                 }
+                for(ProtocolInclusion pe : ProtocolInclusion.values())
+                {
+                    parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+                }
             }
 
             setExceptionHandler();
@@ -399,4 +438,23 @@ public class Main
             }
         }
     }
+
+    private static void parseProtocolInclusions(final BrokerOptions options, final Object[] ports,
+                                       final ProtocolInclusion includedProtocol) throws InitException
+    {
+        if(ports != null)
+        {
+            for(int i = 0; i < ports.length; i++)
+            {
+                try
+                {
+                    options.addIncludedPort(includedProtocol, Integer.parseInt(String.valueOf(ports[i])));
+                }
+                catch (NumberFormatException e)
+                {
+                    throw new InitException("Invalid port for inclusion: " + ports[i], e);
+                }
+            }
+        }
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Fri Aug  3 12:13:32 2012
@@ -35,11 +35,13 @@ public class Binding
     private final Exchange _exchange;
     private final Map<String, Object> _arguments;
     private final UUID _id;
+    private final UUID _qmfId;
     private final AtomicLong _matches = new AtomicLong();
 
-    public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+    public Binding(UUID id, UUID qmfId, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
     {
         _id = id;
+        _qmfId = qmfId;
         _bindingKey = bindingKey;
         _queue = queue;
         _exchange = exchange;
@@ -51,6 +53,11 @@ public class Binding
         return _id;
     }
 
+    public UUID getQMFId()
+    {
+        return _qmfId;
+    }
+
     public String getBindingKey()
     {
         return _bindingKey;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Fri Aug  3 12:13:32 2012
@@ -60,7 +60,7 @@ public class BindingFactory
 
         private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
         {
-            super(id, bindingKey, queue, exchange, arguments);
+            super(id, queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
             _logSubject = new BindingLogSubject(bindingKey,exchange,queue);
 
         }
@@ -166,7 +166,7 @@ public class BindingFactory
 
         if (id == null)
         {
-            id = UUIDGenerator.generateUUID();
+            id = UUIDGenerator.generateBindingUUID(exchange.getName(), queue.getName(), bindingKey, _virtualHost.getName());
         }
         BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments);
         BindingImpl existingMapping = _bindings.putIfAbsent(b, b);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java Fri Aug  3 12:13:32 2012
@@ -101,7 +101,7 @@ public class ConfigStore
 
         }
 
-        typeMap.put(object.getId(), object);
+        typeMap.put(object.getQMFId(), object);
         sendEvent(Event.CREATED, object);
     }
 
@@ -111,7 +111,7 @@ public class ConfigStore
         ConcurrentHashMap typeMap = _typeMap.get(object.getConfigType());
         if(typeMap != null)
         {
-            typeMap.remove(object.getId());
+            typeMap.remove(object.getQMFId());
             sendEvent(Event.DELETED, object);
         }
     }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java Fri Aug  3 12:13:32 2012
@@ -25,7 +25,7 @@ import java.util.UUID;
 
 public interface ConfiguredObject<T extends ConfigObjectType<T,C>, C extends ConfiguredObject<T, C>>
 {
-    public UUID getId();
+    public UUID getQMFId();
 
     public T getConfigType();
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java Fri Aug  3 12:13:32 2012
@@ -49,7 +49,12 @@ public interface ExchangeConfig extends 
 
     long getMsgRoutes();
 
+    long getMsgDrops();
+
     long getByteReceives();
 
     long getByteRoutes();
+
+    long getByteDrops();
+
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java Fri Aug  3 12:13:32 2012
@@ -76,7 +76,7 @@ public final class LinkConfigType extend
         }
     };
 
-    public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("host")
+    public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("port")
     {
         public Integer getValue(LinkConfig object)
         {
@@ -134,4 +134,4 @@ public final class LinkConfigType extend
 
 
 
-}
\ No newline at end of file
+}

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Fri Aug  3 12:13:32 2012
@@ -126,6 +126,11 @@ public class QueueConfiguration extends 
         return _name;
     }
 
+    public String getDescription()
+    {
+        return getStringValue("description");
+    }
+
     public int getMaximumMessageAge()
     {
         return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge());
@@ -226,4 +231,5 @@ public class QueueConfiguration extends 
           
 
     }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,16 @@
 
 package org.apache.qpid.server.configuration;
 
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -28,7 +38,6 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.SystemConfiguration;
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
 import org.apache.qpid.server.exchange.DefaultExchangeFactory;
 import org.apache.qpid.server.protocol.AmqpProtocolVersion;
@@ -40,17 +49,6 @@ import org.apache.qpid.server.virtualhos
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.net.ssl.KeyManagerFactory;
-
 public class ServerConfiguration extends ConfigurationPlugin
 {
     protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
@@ -66,6 +64,9 @@ public class ServerConfiguration extends
     public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
     public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
     public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
+    public static final int DEFAULT_HTTP_MANAGEMENT_PORT = 8080;
+    public static final int DEFAULT_HTTPS_MANAGEMENT_PORT = 8443;
+    public static final long DEFAULT_MINIMUM_ALERT_REPEAT_GAP = 30000l;
 
     public static final String QPID_HOME = "QPID_HOME";
     public static final String QPID_WORK = "QPID_WORK";
@@ -77,6 +78,8 @@ public class ServerConfiguration extends
 
     private File _configFile;
     private File _vhostsFile;
+    private String _qpidWork;
+    private String _qpidHome;
 
     // Map of environment variables to config items
     private static final Map<String, String> envVarMap = new HashMap<String, String>();
@@ -86,6 +89,9 @@ public class ServerConfiguration extends
     public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
     public static final String MGMT_JMXPORT_REGISTRYSERVER = "management.jmxport.registryServer";
     public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
+    public static final String SECURITY_DEFAULT_AUTH_MANAGER = "security.default-auth-manager";
+    public static final String SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER = "security.port-mappings.port-mapping.auth-manager";
+    public static final String SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT = "security.port-mappings.port-mapping.port";
     public static final String STATUS_UPDATES = "status-updates";
     public static final String ADVANCED_LOCALE = "advanced.locale";
     public static final String CONNECTOR_AMQP10ENABLED = "connector.amqp10enabled";
@@ -94,6 +100,11 @@ public class ServerConfiguration extends
     public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled";
     public static final String CONNECTOR_AMQP08ENABLED = "connector.amqp08enabled";
     public static final String CONNECTOR_AMQP_SUPPORTED_REPLY = "connector.amqpDefaultSupportedProtocolReply";
+    public static final String CONNECTOR_INCLUDE_10 = "connector.include10";
+    public static final String CONNECTOR_INCLUDE_010 = "connector.include010";
+    public static final String CONNECTOR_INCLUDE_091 = "connector.include091";
+    public static final String CONNECTOR_INCLUDE_09 = "connector.include09";
+    public static final String CONNECTOR_INCLUDE_08 = "connector.include08";
 
     {
         envVarMap.put("QPID_PORT", "connector.port");
@@ -104,6 +115,8 @@ public class ServerConfiguration extends
         envVarMap.put("QPID_MSGAUTH", "security.msg-auth");
         envVarMap.put("QPID_AUTOREGISTER", "auto_register");
         envVarMap.put("QPID_MANAGEMENTENABLED", "management.enabled");
+        envVarMap.put("QPID_HTTPMANAGEMENTENABLED", "management.http.enabled");
+        envVarMap.put("QPID_HTTPMANAGEMENTPORT", "management.http.port");
         envVarMap.put("QPID_HEARTBEATDELAY", "heartbeat.delay");
         envVarMap.put("QPID_HEARTBEATTIMEOUTFACTOR", "heartbeat.timeoutFactor");
         envVarMap.put("QPID_MAXIMUMMESSAGEAGE", "maximumMessageAge");
@@ -177,7 +190,7 @@ public class ServerConfiguration extends
      * This has been made a two step process to allow the Plugin Manager and
      * Configuration Manager to be initialised in the Application Registry.
      * <p>
-     * If using this ServerConfiguration via an ApplicationRegistry there is no 
+     * If using this ServerConfiguration via an ApplicationRegistry there is no
      * need to explicitly call {@link #initialise()} as this is done via the
      * {@link ApplicationRegistry#initialise()} method.
      *
@@ -199,12 +212,12 @@ public class ServerConfiguration extends
      * Called by {@link ApplicationRegistry#initialise()}.
      * <p>
      * NOTE: A DEFAULT ApplicationRegistry must exist when using this method
-     * or a new ApplicationRegistry will be created. 
+     * or a new ApplicationRegistry will be created.
      *
      * @throws ConfigurationException
      */
     public void initialise() throws ConfigurationException
-    {	
+    {
         setConfiguration("", getConfig());
         setupVirtualHosts(getConfig());
     }
@@ -219,10 +232,10 @@ public class ServerConfiguration extends
     {
         // Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL.
         // This ensure that users remove the element from their configuration file.
-        
+
         if (getListValue("security.jmx.access").size() > 0)
         {
-            String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml." 
+            String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml."
                     + (_configFile == null ? "" : " Configuration file : " + _configFile);
             throw new ConfigurationException(message);
         }
@@ -236,7 +249,7 @@ public class ServerConfiguration extends
 
         if (getListValue("security.principal-databases.principal-database(0).class").size() > 0)
         {
-            String message = "Validation error : security/principal-databases is no longer supported within the configuration xml." 
+            String message = "Validation error : security/principal-databases is no longer supported within the configuration xml."
                     + (_configFile == null ? "" : " Configuration file : " + _configFile);
             throw new ConfigurationException(message);
         }
@@ -249,6 +262,13 @@ public class ServerConfiguration extends
             throw new ConfigurationException(message);
         }
 
+        String[] ports = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT);
+        String[] authManagers = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER);
+        if (ports.length != authManagers.length)
+        {
+            throw new ConfigurationException("Validation error: Each port-mapping must have exactly one port and exactly one auth-manager.");
+        }
+
         // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
         // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
         for (String key : new String[] {"management.ssl.keystorePath",
@@ -280,7 +300,7 @@ public class ServerConfiguration extends
     @SuppressWarnings("unchecked")
     protected void setupVirtualHosts(Configuration conf) throws ConfigurationException
     {
-        List<String> vhostFiles = conf.getList("virtualhosts");
+        List<String> vhostFiles = (List) conf.getList("virtualhosts");
         Configuration vhostConfig = conf.subset("virtualhosts");
 
         // Only one configuration mechanism allowed
@@ -470,7 +490,7 @@ public class ServerConfiguration extends
             Configuration newConfig = parseConfig(_configFile);
             setConfiguration("", newConfig);
             ApplicationRegistry.getInstance().getSecurityManager().configureHostPlugins(this);
-			
+
             // Reload virtualhosts from correct location
             Configuration newVhosts;
             if (_vhostsFile == null)
@@ -495,15 +515,29 @@ public class ServerConfiguration extends
             _logger.warn(SECURITY_CONFIG_RELOADED);
         }
     }
-    
+
     public String getQpidWork()
     {
-        return System.getProperty(QPID_WORK, System.getProperty("java.io.tmpdir"));
+        if ( _qpidWork == null )
+        {
+            return System.getProperty(QPID_WORK, System.getProperty("java.io.tmpdir"));
+        }
+        else
+        {
+            return _qpidWork;
+        }
     }
-    
+
     public String getQpidHome()
     {
-        return System.getProperty(QPID_HOME);
+        if ( _qpidHome == null )
+        {
+            return System.getProperty(QPID_HOME);
+        }
+        else
+        {
+            return _qpidHome;
+        }
     }
 
     public void setJMXPortRegistryServer(int registryServerPort)
@@ -541,16 +575,36 @@ public class ServerConfiguration extends
         return getBooleanValue("management.platform-mbeanserver", true);
     }
 
+    public boolean getHTTPManagementEnabled()
+    {
+        return getBooleanValue("management.http.enabled", true);
+    }
+
+    public int getHTTPManagementPort()
+    {
+        return getIntValue("management.http.port", DEFAULT_HTTP_MANAGEMENT_PORT);
+    }
+
+    public boolean getHTTPSManagementEnabled()
+    {
+        return getBooleanValue("management.https.enabled", false);
+    }
+
+    public int getHTTPSManagementPort()
+    {
+        return getIntValue("management.https.port", DEFAULT_HTTPS_MANAGEMENT_PORT);
+    }
+
     public String[] getVirtualHosts()
     {
         return _virtualHosts.keySet().toArray(new String[_virtualHosts.size()]);
     }
-    
+
     public String getPluginDirectory()
     {
         return getStringValue("plugin-directory");
     }
-    
+
     public String getCacheDirectory()
     {
         return getStringValue("cache-directory");
@@ -581,6 +635,26 @@ public class ServerConfiguration extends
         return getBooleanValue("security.msg-auth");
     }
 
+    public String getDefaultAuthenticationManager()
+    {
+        return getStringValue(SECURITY_DEFAULT_AUTH_MANAGER);
+    }
+
+    public Map<Integer, String> getPortAuthenticationMappings()
+    {
+        String[] ports = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT);
+        String[] authManagers = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER);
+
+        Map<Integer,String> portMappings = new HashMap<Integer, String>();
+        for(int i = 0; i < ports.length; i++)
+        {
+            portMappings.put(Integer.valueOf(ports[i]), authManagers[i]);
+        }
+
+        return portMappings;
+    }
+
+
     public String getManagementKeyStorePath()
     {
         final String fallback = getStringValue("management.ssl.keystorePath");
@@ -589,7 +663,7 @@ public class ServerConfiguration extends
 
     public boolean getManagementSSLEnabled()
     {
-        return getBooleanValue("management.ssl.enabled", true);
+        return getBooleanValue("management.ssl.enabled", false);
     }
 
     public String getManagementKeyStorePassword()
@@ -603,16 +677,11 @@ public class ServerConfiguration extends
         return getBooleanValue("queue.auto_register", true);
     }
 
-    public boolean getManagementEnabled()
+    public boolean getJMXManagementEnabled()
     {
         return getBooleanValue("management.enabled", true);
     }
 
-    public void setManagementEnabled(boolean enabled)
-    {
-        getConfig().setProperty("management.enabled", enabled);
-    }
-
     public int getHeartBeatDelay()
     {
         return getIntValue("heartbeat.delay", 5);
@@ -645,7 +714,7 @@ public class ServerConfiguration extends
 
     public long getMinimumAlertRepeatGap()
     {
-        return getLongValue("minimumAlertRepeatGap");
+        return getLongValue("minimumAlertRepeatGap", DEFAULT_MINIMUM_ALERT_REPEAT_GAP);
     }
 
     public long getCapacity()
@@ -693,6 +762,31 @@ public class ServerConfiguration extends
         return getListValue("connector.non08port");
     }
 
+    public List getPortInclude08()
+    {
+        return getListValue(CONNECTOR_INCLUDE_08);
+    }
+
+    public List getPortInclude09()
+    {
+        return getListValue(CONNECTOR_INCLUDE_09);
+    }
+
+    public List getPortInclude091()
+    {
+        return getListValue(CONNECTOR_INCLUDE_091);
+    }
+
+    public List getPortInclude010()
+    {
+        return getListValue(CONNECTOR_INCLUDE_010);
+    }
+
+    public List getPortInclude10()
+    {
+        return getListValue(CONNECTOR_INCLUDE_10);
+    }
+
     public String getBind()
     {
         return getStringValue("connector.bind", WILDCARD_ADDRESS);
@@ -740,6 +834,11 @@ public class ServerConfiguration extends
         return getStringValue("connector.ssl.keyStorePassword", fallback);
     }
 
+    public String getConnectorKeyStoreType()
+    {
+        return getStringValue("connector.ssl.keyStoreType", "JKS");
+    }
+
     public String getConnectorKeyManagerFactoryAlgorithm()
     {
         final String systemFallback = KeyManagerFactory.getDefaultAlgorithm();
@@ -748,6 +847,41 @@ public class ServerConfiguration extends
         return getStringValue("connector.ssl.keyManagerFactoryAlgorithm", fallback);
     }
 
+    public String getConnectorTrustStorePath()
+    {
+        return getStringValue("connector.ssl.trustStorePath", null);
+    }
+
+    public String getConnectorTrustStorePassword()
+    {
+        return getStringValue("connector.ssl.trustStorePassword", null);
+    }
+
+    public String getConnectorTrustStoreType()
+    {
+        return getStringValue("connector.ssl.trustStoreType", "JKS");
+    }
+
+    public String getConnectorTrustManagerFactoryAlgorithm()
+    {
+        return getStringValue("connector.ssl.trustManagerFactoryAlgorithm", TrustManagerFactory.getDefaultAlgorithm());
+    }
+
+    public String getCertAlias()
+    {
+        return getStringValue("connector.ssl.certAlias", null);
+    }
+
+    public boolean needClientAuth()
+    {
+        return getConfig().getBoolean("connector.ssl.needClientAuth", false);
+    }
+
+    public boolean wantClientAuth()
+    {
+        return getConfig().getBoolean("connector.ssl.wantClientAuth", false);
+    }
+
     public String getDefaultVirtualHost()
     {
         return getStringValue("virtualhosts.default");
@@ -756,7 +890,7 @@ public class ServerConfiguration extends
     public void setDefaultVirtualHost(String vhost)
     {
          getConfig().setProperty("virtualhosts.default", vhost);
-    }    
+    }
 
     public void setHousekeepingCheckPeriod(long value)
     {
@@ -883,4 +1017,15 @@ public class ServerConfiguration extends
 
         return reply == null ? null : AmqpProtocolVersion.valueOf(reply);
     }
+
+    public void setQpidWork(String path)
+    {
+        _qpidWork = path;
+    }
+
+    public void setQpidHome(String path)
+    {
+        _qpidHome = path;
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java Fri Aug  3 12:13:32 2012
@@ -28,7 +28,7 @@ public class ServerNetworkTransportConfi
     private final String _transport;
     private InetSocketAddress _address;
 
-    public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig, 
+    public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig,
                                                final InetSocketAddress address,
                                                final String transport)
     {
@@ -76,4 +76,15 @@ public class ServerNetworkTransportConfi
     {
         return _address;
     }
+
+    public boolean needClientAuth()
+    {
+        return _serverConfig.needClientAuth();
+    }
+
+    @Override
+    public boolean wantClientAuth()
+    {
+        return _serverConfig.wantClientAuth();
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java Fri Aug  3 12:13:32 2012
@@ -33,7 +33,7 @@ public class SystemConfigImpl implements
     private static final String OS_ARCH = System.getProperty("os.arch");
     private static final String OS_VERSION = System.getProperty("os.version");
 
-    private final UUID _id;
+    private final UUID _qmfId;
     private String _name;
 
     private final String _host;
@@ -48,9 +48,9 @@ public class SystemConfigImpl implements
         this(store.createId(), store);
     }
 
-    public SystemConfigImpl(UUID id, ConfigStore store)
+    public SystemConfigImpl(UUID qmfId, ConfigStore store)
     {
-        _id = id;
+        _qmfId = qmfId;
         _store = store;
         String host;
         try
@@ -95,9 +95,10 @@ public class SystemConfigImpl implements
         return OS_ARCH;
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public SystemConfigType getConfigType()
@@ -119,12 +120,12 @@ public class SystemConfigImpl implements
     {
         broker.setSystem(this);
         _store.addConfiguredObject(broker);
-        _brokers.put(broker.getId(), broker);
+        _brokers.put(broker.getQMFId(), broker);
     }
 
     public void removeBroker(final BrokerConfig broker)
     {
-        _brokers.remove(broker.getId());
+        _brokers.remove(broker.getQMFId());
         _store.removeConfiguredObject(broker);
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java Fri Aug  3 12:13:32 2012
@@ -65,7 +65,7 @@ public final class SystemConfigType exte
     {
         public UUID getValue(SystemConfig object)
         {
-            return object.getId();
+            return object.getQMFId();
         }
     };
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java Fri Aug  3 12:13:32 2012
@@ -61,7 +61,6 @@ public class TopicConfig extends Configu
             throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element.");
         }
 
-        System.err.println("********* Created TC:"+this);
     }
 
 
@@ -75,5 +74,5 @@ public class TopicConfig extends Configu
         }
 
         return response;
-    }    
+    }
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Aug  3 12:13:32 2012
@@ -32,7 +32,6 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -103,14 +102,14 @@ public class VirtualHostConfiguration ex
         return getConfig().subset("store");
     }
 
-    public String getMessageStoreFactoryClass()
+    public String getMessageStoreClass()
     {
-        return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
+        return getStringValue("store.class", MemoryMessageStore.class.getName());
     }
 
-    public void setMessageStoreFactoryClass(String storeFactoryClass)
+    public void setMessageStoreClass(String storeFactoryClass)
     {
-        getConfig().setProperty("store.factoryclass", storeFactoryClass);
+        getConfig().setProperty("store.class", storeFactoryClass);
     }
 
     public List getExchanges()
@@ -271,7 +270,7 @@ public class VirtualHostConfiguration ex
 
     public Long getMinimumAlertRepeatGap()
     {
-        return getLongValue("queues.minimumAlertRepeatGap");
+        return getLongValue("queues.minimumAlertRepeatGap", ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap());
     }
 
     public long getCapacity()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Aug  3 12:13:32 2012
@@ -22,13 +22,12 @@ package org.apache.qpid.server.connectio
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.transport.TransportException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -37,6 +36,8 @@ public class ConnectionRegistry implemen
     private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
 
     private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
+    private final Collection<RegistryChangeListener> _listeners =
+            new ArrayList<RegistryChangeListener>();
 
     public void initialise()
     {
@@ -62,34 +63,77 @@ public class ConnectionRegistry implemen
         }
     }
 
-    public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+    private void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
     {
         try
         {
             connection.close(cause, message);
         }
-        catch (TransportException e)
+        catch (Exception e)
         {
-            _logger.warn("Error closing connection:" + e.getMessage());
-        }
-        catch (AMQException e)
-        {
-            _logger.warn("Error closing connection:" + e.getMessage());
+            _logger.warn("Exception closing connection", e);
         }
     }
 
     public void registerConnection(AMQConnectionModel connnection)
     {
-        _registry.add(connnection);
+        synchronized (this)
+        {
+            _registry.add(connnection);
+            synchronized (_listeners)
+            {
+                for(RegistryChangeListener listener : _listeners)
+                {
+                    listener.connectionRegistered(connnection);
+                }
+            }
+        }
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
+            {
+                listener.connectionRegistered(connnection);
+            }
+        }
     }
 
     public void deregisterConnection(AMQConnectionModel connnection)
     {
-        _registry.remove(connnection);
+        synchronized (this)
+        {
+            _registry.remove(connnection);
+
+            synchronized (_listeners)
+            {
+                for(RegistryChangeListener listener : _listeners)
+                {
+                    listener.connectionUnregistered(connnection);
+                }
+            }
+        }
+
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
+            {
+                listener.connectionUnregistered(connnection);
+            }
+        }
+    }
+
+    public void addRegistryChangeListener(RegistryChangeListener listener)
+    {
+        synchronized (_listeners)
+        {
+            _listeners.add(listener);
+        }
     }
 
     public List<AMQConnectionModel> getConnections()
     {
-        return new ArrayList<AMQConnectionModel>(_registry);
+        synchronized (this)
+        {
+            return new ArrayList<AMQConnectionModel>(_registry);
+        }
     }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Aug  3 12:13:32 2012
@@ -37,11 +37,18 @@ public interface IConnectionRegistry
 
     public void close(String replyText) throws AMQException;
 
-    public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
     public List<AMQConnectionModel> getConnections();
 
     public void registerConnection(AMQConnectionModel connnection);
 
     public void deregisterConnection(AMQConnectionModel connnection);
+
+    void addRegistryChangeListener(RegistryChangeListener listener);
+
+    interface RegistryChangeListener
+    {
+        void connectionRegistered(AMQConnectionModel connection);
+        void connectionUnregistered(AMQConnectionModel connection);
+
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org