You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [24/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/java/broker/etc/broker_example.acl
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/etc/broker_example.acl?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/etc/broker_example.acl (original)
+++ qpid/branches/asyncstore/java/broker/etc/broker_example.acl Thu Feb 28 16:14:30 2013
@@ -19,24 +19,20 @@
 
 ### EXAMPLE ACL V2 FILE
 ### NOTE: Rules are considered from top to bottom, and the first matching rule governs the decision.
-
-### DEFINE GROUPS ###
-
-#Define a 'messaging-users' group with users  'client' and  'server' in it
-GROUP messaging-users client server
-
-#Define a group for management web console users
-GROUP webadmins webadmin
+### Rules may refer to users or groups. Groups are currently defined in the etc/groups file.
 
 ### JMX MANAGEMENT ####
 
-# Allow everyone to perform read operations on the ServerInformation mbean
-# This is used for items such as querying the management API and broker release versions.
-ACL ALLOW ALL ACCESS METHOD component="ServerInformation"
-
-# Allow 'admin' all management operations. To reduce log file noise, only non-read-only operations are logged.
-ACL ALLOW admin ACCESS METHOD
-ACL ALLOW-LOG admin ALL METHOD
+# To use JMX management, first give the user/group ACCESS MANAGEMENT permission
+ACL ALLOW administrators ACCESS MANAGEMENT
+ACL ALLOW guest ACCESS MANAGEMENT
+
+# Allow guest to perform read operations on the ServerInformation mbean
+ACL ALLOW guest ACCESS METHOD component="ServerInformation"
+
+# Allow 'administrators' all management operations. To reduce log file noise, only non-read-only operations are logged.
+ACL ALLOW administrators ACCESS METHOD
+ACL ALLOW-LOG administrators ALL METHOD
 
 # Allow 'guest' to view logger levels, and use getter methods on LoggingManagement
 ACL ALLOW guest ACCESS METHOD component="LoggingManagement" name="viewEffectiveRuntimeLoggerLevels"
@@ -49,17 +45,61 @@ ACL DENY-LOG ALL ACCESS METHOD component
 ACL DENY-LOG ALL ACCESS METHOD component="ConfigurationManagement"
 ACL DENY-LOG ALL ACCESS METHOD component="LoggingManagement"
 
-# Allow everyone to perform all read operations (using ALLOW rather than ALLOW-LOG to reduce log file noise)
-# on the mbeans not listed in the DENY rules above
+# Allow everyone to perform all read operations on the mbeans not listed in the DENY rules above
 ACL ALLOW ALL ACCESS METHOD
 
+### WEB MANAGEMENT ####
+
+# To use web management, first give the user/group ACCESS MANAGEMENT permission
+ACL ALLOW webadmins ACCESS MANAGEMENT
+
+# ACL for web management console admins
+# All rules below are required for console admin users
+# to perform create/update/delete operations
+ACL ALLOW-LOG webadmins CREATE QUEUE
+ACL ALLOW-LOG webadmins DELETE QUEUE
+ACL ALLOW-LOG webadmins PURGE  QUEUE
+ACL ALLOW-LOG webadmins CREATE EXCHANGE
+ACL ALLOW-LOG webadmins DELETE EXCHANGE
+ACL ALLOW-LOG webadmins BIND   EXCHANGE
+ACL ALLOW-LOG webadmins UNBIND EXCHANGE
+ACL ALLOW-LOG webadmins CREATE GROUP
+ACL ALLOW-LOG webadmins DELETE GROUP
+ACL ALLOW-LOG webadmins UPDATE GROUP
+ACL ALLOW-LOG webadmins CREATE USER
+ACL ALLOW-LOG webadmins DELETE USER
+ACL ALLOW-LOG webadmins UPDATE USER
+
+ACL ALLOW-LOG webadmins UPDATE METHOD
+
+# at the moment only the following UPDATE METHOD rules are supported by web management console
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="moveMessages"
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="copyMessages"
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="deleteMessages"
+
 ### MESSAGING ###
+# The 'ACCESS VIRTUALHOST' rules below apply to messaging operations (as opposed to management operations)
+
+# Firewall examples
+
+# Deny access to all users from *.example.company1.com and *.example.company2.com
+ACL DENY-LOG all ACCESS VIRTUALHOST from_hostname=".*\.example\.company1.com,.*\.example\.company2.com"
 
-#Example permissions for request-response based messaging.
+# Deny access to all users in the IP ranges 192.168.1.0-192.168.1.255 and 192.168.2.0-192.168.2.255,
+# using the notation specified in RFC 4632, "Classless Inter-domain Routing (CIDR)"
+ACL DENY-LOG messaging-users ACCESS VIRTUALHOST from_network="192.168.1.0/24,192.168.2.0/24"
 
-#Allow 'messaging-users' group to connect to the virtualhost
+# Deny access to all users in the IP ranges 192.169.1.0-192.169.1.255 and 192.169.2.0-192.169.2.255,
+# using wildcard notation.
+ACL DENY-LOG messaging-users ACCESS VIRTUALHOST from_network="192.169.1.*,192.169.2.*"
+
+# Allow 'messaging-users' group to connect to all virtualhosts
 ACL ALLOW-LOG messaging-users ACCESS VIRTUALHOST
 
+# Deny messaging-users management
+ACL DENY-LOG messaging-users ACCESS MANAGEMENT
+
+
 # Client side
 # Allow the 'client' user to publish requests to the request queue and create, consume from, and delete temporary reply queues.
 ACL ALLOW-LOG client CREATE QUEUE temporary="true"
@@ -77,24 +117,8 @@ ACL ALLOW-LOG server CONSUME QUEUE name=
 ACL ALLOW-LOG server BIND EXCHANGE
 ACL ALLOW-LOG server PUBLISH EXCHANGE name="amq.direct" routingKey="TempQueue*"
 
-# ACL for web management console admins
-# All rules below are required for console admin users
-# to perform create/update/delete operations
-ACL ALLOW-LOG webadmins CREATE QUEUE
-ACL ALLOW-LOG webadmins DELETE QUEUE
-ACL ALLOW-LOG webadmins PURGE  QUEUE
-ACL ALLOW-LOG webadmins CREATE EXCHANGE
-ACL ALLOW-LOG webadmins DELETE EXCHANGE
-ACL ALLOW-LOG webadmins BIND   EXCHANGE
-ACL ALLOW-LOG webadmins UNBIND EXCHANGE
-ACL ALLOW-LOG webadmins UPDATE METHOD
-
-# at the moment only the following UPDATE METHOD rules are supported by web management console
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="moveMessages"
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="copyMessages"
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="deleteMessages"
 
 ### DEFAULT ###
 
-#Deny all users from performing all operations
+# Deny all users from performing all operations
 ACL DENY-LOG all all

Modified: qpid/branches/asyncstore/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/etc/log4j.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/etc/log4j.xml (original)
+++ qpid/branches/asyncstore/java/broker/etc/log4j.xml Thu Feb 28 16:14:30 2013
@@ -68,7 +68,7 @@
         <param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
 
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+            <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
         </layout>
     </appender>
 
@@ -77,20 +77,20 @@
         <param name="Append" value="false"/>
 
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+            <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
         </layout>
     </appender>
 
     <appender class="org.apache.log4j.ConsoleAppender" name="STDOUT">
         <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+            <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
         </layout>
     </appender>
 
     <!-- Provide warnings to standard output -->
-    <category additivity="true" name="org.apache.qpid">
-        <priority value="warn"/>
-    </category>
+    <logger additivity="true" name="org.apache.qpid">
+        <level value="warn"/>
+    </logger>
 
     <!-- Enable info messages for the status-logging hierarchy -->
     <logger additivity="true" name="qpid.message">
@@ -108,21 +108,14 @@
       <level value="info"/>
     </logger>
 
-    <!-- Examples of additional logging settings -->
-    <!-- Used to generate extra debug. See debug.log4j.xml -->
-    
-    <!--<category additivity="true" name="org.apache.qpid.server.store">
-        <priority value="debug"/>
-    </category-->
-
     <!-- Set the commons logging that the XML parser uses to WARN, it is very chatty at debug -->
     <logger name="org.apache.commons">
-        <level value="WARN"/>
+        <level value="warn"/>
     </logger>
 
     <!-- Log all info events to file -->
     <root>
-        <priority value="info"/>
+        <level value="info"/>
         <appender-ref ref="FileAppender"/>
         <!--appender-ref ref="ArchivingFileAppender"/-->
     </root>

Modified: qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd Thu Feb 28 16:14:30 2013
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.19.0
+ver: 0.21.0
 
 Bundle-SymbolicName: qpid-broker
 Bundle-Version: ${ver}

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 28 16:14:30 2013
@@ -51,13 +51,10 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -75,7 +72,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -83,7 +79,6 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.queue.InboundMessageAdapter;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
@@ -93,19 +88,19 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
 
     private static final Logger _logger = Logger.getLogger(AMQChannel.class);
 
-    private static final boolean MSG_AUTH =
-        ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
-
+    //TODO use Broker property to configure message authorization requirements
+    private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH);
 
     private final int _channelId;
 
@@ -118,7 +113,7 @@ public class AMQChannel implements Sessi
      */
     private long _deliveryTag = 0;
 
-    /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+    /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
     private AMQQueue _defaultQueue;
 
     /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
@@ -151,7 +146,6 @@ public class AMQChannel implements Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private final AMQProtocolSession _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -169,12 +163,12 @@ public class AMQChannel implements Sessi
     private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
     private static final
     AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
-    private final UUID _qmfId;
     private long _createTime = System.currentTimeMillis();
 
     private final ClientDeliveryMethod _clientDeliveryMethod;
 
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
+    private final UUID _id = UUID.randomUUID();
 
     public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
             throws AMQException
@@ -184,30 +178,36 @@ public class AMQChannel implements Sessi
 
         _actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
         _logSubject = new ChannelLogSubject(this);
-        _qmfId = getConfigStore().createId();
         _actor.message(ChannelMessages.CREATE());
 
-        getConfigStore().addConfiguredObject(this);
-
         _messageStore = messageStore;
 
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
-         _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
-
-         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
-    }
+        _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
 
-    public ConfigStore getConfigStore()
-    {
-        return getVirtualHost().getConfigStore();
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+        {
+            @Override
+            public void doTimeoutAction(String reason) throws AMQException
+            {
+                closeConnection(reason);
+            }
+        });
     }
 
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
-        _transaction = new LocalTransaction(_messageStore);
+        _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
+        {
+            @Override
+            public long getActivityTime()
+            {
+                return _session.getLastReceivedTime();
+            }
+        });
         _txnStarts.incrementAndGet();
     }
 
@@ -221,12 +221,6 @@ public class AMQChannel implements Sessi
         sync();
     }
 
-
-    public boolean inTransaction()
-    {
-        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
-    }
-
     private void incrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -247,11 +241,6 @@ public class AMQChannel implements Sessi
         }
     }
 
-    public Long getTxnStarts()
-    {
-        return _txnStarts.get();
-    }
-
     public Long getTxnCommits()
     {
         return _txnCommits.get();
@@ -369,9 +358,8 @@ public class AMQChannel implements Sessi
                             }
                         });
 
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
+                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
                         incrementOutstandingTxnsIfNecessary();
-                        updateTransactionalActivity();
                         _currentMessage.getStoredMessage().flushToStore();
                     }
                 }
@@ -396,7 +384,7 @@ public class AMQChannel implements Sessi
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
+            _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
         }
 
         try
@@ -556,9 +544,6 @@ public class AMQChannel implements Sessi
         {
             _logger.error("Caught TransportException whilst attempting to requeue:" + e);
         }
-
-        getConfigStore().removeConfiguredObject(this);
-
     }
 
     private void unsubscribeAllConsumers() throws AMQException
@@ -860,7 +845,6 @@ public class AMQChannel implements Sessi
     {
         Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
-	    updateTransactionalActivity();
     }
 
     private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -1054,19 +1038,6 @@ public class AMQChannel implements Sessi
             }
 
         }
-
-
-    }
-
-    /**
-     * Update last transaction activity timestamp
-     */
-    private void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
-        }
     }
 
     public String toString()
@@ -1149,10 +1120,16 @@ public class AMQChannel implements Sessi
                     ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
                     : null;
 
-        return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+        return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
 
     }
 
+    @Override
+    public UUID getId()
+    {
+        return _id;
+    }
+
     public AMQConnectionModel getConnectionModel()
     {
         return _session;
@@ -1168,6 +1145,12 @@ public class AMQChannel implements Sessi
         return _logSubject;
     }
 
+    @Override
+    public int compareTo(AMQSessionModel o)
+    {
+        return getId().compareTo(o.getId());
+    }
+
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
         private IncomingMessage _incommingMessage;
@@ -1221,11 +1204,6 @@ public class AMQChannel implements Sessi
                 // TODO
                 throw new RuntimeException(e);
             }
-
-
-
-
-
         }
 
         public void onRollback()
@@ -1375,7 +1353,6 @@ public class AMQChannel implements Sessi
 
         public void onRollback()
         {
-            //To change body of implemented methods use File | Settings | File Templates.
         }
     }
 
@@ -1469,97 +1446,24 @@ public class AMQChannel implements Sessi
         return getProtocolSession().getVirtualHost();
     }
 
-
-    public ConfiguredObject getParent()
-    {
-        return getVirtualHost();
-    }
-
-    public SessionConfigType getConfigType()
-    {
-        return SessionConfigType.getInstance();
-    }
-
     public int getChannel()
     {
         return getChannelId();
     }
 
-    public boolean isAttached()
-    {
-        return true;
-    }
-
-    public long getDetachedLifespan()
-    {
-        return 0;
-    }
-
-    public ConnectionConfig getConnectionConfig()
-    {
-        return (AMQProtocolEngine)getProtocolSession();
-    }
-
-    public Long getExpiryTime()
-    {
-        return null;
-    }
-
-    public Long getMaxClientRate()
-    {
-        return null;
-    }
-
     public boolean isDurable()
     {
         return false;
     }
 
-    @Override
-    public UUID getQMFId()
-    {
-        return _qmfId;
-    }
-
-    public String getSessionName()
-    {
-        return getConnectionConfig().getAddress() + "/" + getChannelId();
-    }
-
     public long getCreateTime()
     {
         return _createTime;
     }
 
-    public void mgmtClose() throws AMQException
-    {
-        _session.mgmtCloseChannel(_channelId);
-    }
-
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        if (inTransaction())
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - _transaction.getTransactionStartTime();
-            long idleTime = currentTime - _txnUpdateTime.get();
-
-            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
-                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
-            {
-                closeConnection("Idle transaction timed out");
-                return;
-            }
-
-            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
-                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
-            {
-                closeConnection("Open transaction timed out");
-                return;
-            }
-        }
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
     /**
@@ -1637,6 +1541,11 @@ public class AMQChannel implements Sessi
 
     public void sync()
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("sync() called on channel " + debugIdentity());
+        }
+
         AsyncCommand cmd;
         while((cmd = _unfinishedCommandsQueue.poll()) != null)
         {
@@ -1674,16 +1583,6 @@ public class AMQChannel implements Sessi
             _action.postCommit();
             _action = null;
         }
-
-        boolean isReadyForCompletion()
-        {
-            return _future.isComplete();
-        }
-    }
-
-    public int compareTo(AMQSessionModel session)
-    {
-        return getQMFId().compareTo(session.getQMFId());
     }
 
     @Override

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java Thu Feb 28 16:14:30 2013
@@ -23,37 +23,31 @@ package org.apache.qpid.server;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.*;
-import javax.net.ssl.SSLContext;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
+import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
 import org.apache.qpid.server.logging.SystemOutMessageLogger;
 import org.apache.qpid.server.logging.actors.BrokerActor;
 import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.log4j.LoggingFacade;
+import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
 import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 
 public class Broker
 {
     private static final Logger LOGGER = Logger.getLogger(Broker.class);
 
     private volatile Thread _shutdownHookThread;
+    private volatile IApplicationRegistry _applicationRegistry;
 
     protected static class InitException extends RuntimeException
     {
@@ -73,7 +67,17 @@ public class Broker
         }
         finally
         {
-            ApplicationRegistry.remove();
+            try
+            {
+                if (_applicationRegistry != null)
+                {
+                    _applicationRegistry.close();
+                }
+            }
+            finally
+            {
+                clearAMQShortStringCache();
+            }
         }
     }
 
@@ -84,274 +88,76 @@ public class Broker
 
     public void startup(final BrokerOptions options) throws Exception
     {
+        CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
         try
         {
-            CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
             startupImpl(options);
             addShutdownHook();
         }
         finally
         {
-            CurrentActor.remove();
+            try
+            {
+                CurrentActor.remove();
+            }
+            finally
+            {
+                clearAMQShortStringCache();
+            }
         }
     }
 
     private void startupImpl(final BrokerOptions options) throws Exception
     {
-        final String qpidHome = options.getQpidHome();
-        final File configFile = getConfigFile(options.getConfigFile(),
-                                    BrokerOptions.DEFAULT_CONFIG_FILE, qpidHome, true);
-
-        CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
-
-        File logConfigFile = getConfigFile(options.getLogConfigFile(),
-                                    BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
-
-        configureLogging(logConfigFile, options.getLogWatchFrequency());
+        final String qpidHome = System.getProperty(BrokerProperties.PROPERTY_QPID_HOME);
+        String storeLocation = options.getConfigurationStoreLocation();
+        String storeType = options.getConfigurationStoreType();
 
-        ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
-        ServerConfiguration serverConfig = config.getConfiguration();
-        if (options.getQpidWork() != null)
+        if (storeLocation == null)
         {
-            serverConfig.setQpidWork(options.getQpidWork());
-        }
-        if (options.getQpidHome() != null)
-        {
-            serverConfig.setQpidHome(options.getQpidHome());
-        }
-        updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
-
-        ApplicationRegistry.initialise(config);
-
-        // We have already loaded the BrokerMessages class by this point so we
-        // need to refresh the locale setting incase we had a different value in
-        // the configuration.
-        BrokerMessages.reload();
-
-        // AR.initialise() sets and removes its own actor so we now need to set the actor
-        // for the remainder of the startup, and the default actor if the stack is empty
-        CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
-        CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
-        GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-
-        try
-        {
-            Set<Integer> ports = new HashSet<Integer>(options.getPorts());
-            if(ports.isEmpty())
-            {
-                parsePortList(ports, serverConfig.getPorts());
-            }
-
-            Set<Integer> sslPorts = new HashSet<Integer>(options.getSSLPorts());
-            if(sslPorts.isEmpty())
-            {
-                parsePortList(sslPorts, serverConfig.getSSLPorts());
-            }
-
-            //1-0 excludes and includes
-            Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
-            if(exclude_1_0.isEmpty())
-            {
-                parsePortList(exclude_1_0, serverConfig.getPortExclude10());
-            }
-
-            Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
-            if(include_1_0.isEmpty())
-            {
-                parsePortList(include_1_0, serverConfig.getPortInclude10());
-            }
-
-            //0-10 excludes and includes
-            Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
-            if(exclude_0_10.isEmpty())
-            {
-                parsePortList(exclude_0_10, serverConfig.getPortExclude010());
-            }
-
-            Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
-            if(include_0_10.isEmpty())
-            {
-                parsePortList(include_0_10, serverConfig.getPortInclude010());
-            }
-
-            //0-9-1 excludes and includes
-            Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
-            if(exclude_0_9_1.isEmpty())
+            String qpidWork = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK);
+            if (qpidWork == null)
             {
-                parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+                qpidWork = new File(System.getProperty("user.dir"), "work").getAbsolutePath();
             }
-
-            Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
-            if(include_0_9_1.isEmpty())
-            {
-                parsePortList(include_0_9_1, serverConfig.getPortInclude091());
-            }
-
-            //0-9 excludes and includes
-            Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
-            if(exclude_0_9.isEmpty())
-            {
-                parsePortList(exclude_0_9, serverConfig.getPortExclude09());
-            }
-
-            Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
-            if(include_0_9.isEmpty())
-            {
-                parsePortList(include_0_9, serverConfig.getPortInclude09());
-            }
-
-            //0-8 excludes and includes
-            Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
-            if(exclude_0_8.isEmpty())
-            {
-                parsePortList(exclude_0_8, serverConfig.getPortExclude08());
-            }
-
-            Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
-            if(include_0_8.isEmpty())
-            {
-                parsePortList(include_0_8, serverConfig.getPortInclude08());
-            }
-
-            String bindAddr = options.getBind();
-            if (bindAddr == null)
-            {
-                bindAddr = serverConfig.getBind();
-            }
-
-            InetAddress bindAddress;
-            if (bindAddr.equals(WILDCARD_ADDRESS))
-            {
-                bindAddress = null;
-            }
-            else
-            {
-                bindAddress = InetAddress.getByName(bindAddr);
-            }
-
-            final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply();
-
-            if (!serverConfig.getSSLOnly())
-            {
-                for(int port : ports)
-                {
-                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
-
-                    final Set<AmqpProtocolVersion> supported =
-                                    getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
-                                                         include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
-
-                    final NetworkTransportConfiguration settings =
-                                    new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
-                    final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
-                    final MultiVersionProtocolEngineFactory protocolEngineFactory =
-                                    new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
-                    transport.accept(settings, protocolEngineFactory, null);
-
-                    ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
-                                    new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
-                    CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
-                }
-            }
-
-            if (serverConfig.getEnableSSL())
-            {
-                final String keystorePath = serverConfig.getConnectorKeyStorePath();
-                final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
-                final String keystoreType = serverConfig.getConnectorKeyStoreType();
-                final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
-                final SSLContext sslContext;
-                if(serverConfig.getConnectorTrustStorePath()!=null)
-                {
-                    sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
-                                                                      serverConfig.getConnectorTrustStorePassword(),
-                                                                      serverConfig.getConnectorTrustStoreType(),
-                                                                      serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
-                                                                      keystorePath,
-                                                                      keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
-                                                                      serverConfig.getCertAlias());
-                }
-                else
-                {
-                    sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
-                }
-
-                for(int sslPort : sslPorts)
-                {
-                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
-
-                    final Set<AmqpProtocolVersion> supported =
-                                    getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
-                                                         include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
-                    final NetworkTransportConfiguration settings =
-                        new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
-                    final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
-                    final MultiVersionProtocolEngineFactory protocolEngineFactory =
-                                    new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
-                    transport.accept(settings, protocolEngineFactory, sslContext);
-
-                    ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
-                            new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
-                    CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
-                }
-            }
-
-            CurrentActor.get().message(BrokerMessages.READY());
+            storeLocation = new File(qpidWork, BrokerOptions.DEFAULT_CONFIG_FILE + "." + storeType).getAbsolutePath();
         }
-        finally
-        {
-            // Startup is complete so remove the AR initialised Startup actor
-            CurrentActor.remove();
-        }
-    }
 
-    private static Set<AmqpProtocolVersion> getSupportedVersions(final int port,
-                                                                 final Set<Integer> exclude_1_0,
-                                                                 final Set<Integer> exclude_0_10,
-                                                                 final Set<Integer> exclude_0_9_1,
-                                                                 final Set<Integer> exclude_0_9,
-                                                                 final Set<Integer> exclude_0_8,
-                                                                 final Set<Integer> include_1_0,
-                                                                 final Set<Integer> include_0_10,
-                                                                 final Set<Integer> include_0_9_1,
-                                                                 final Set<Integer> include_0_9,
-                                                                 final Set<Integer> include_0_8,
-                                                                 final ServerConfiguration serverConfig)
-    {
-        final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+        CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation));
 
-        if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
-        {
-            supported.remove(AmqpProtocolVersion.v1_0_0);
-        }
+        File logConfigFile = getConfigFile(options.getLogConfigFile(), BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
+        configureLogging(logConfigFile, options.getLogWatchFrequency());
 
-        if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
-        {
-            supported.remove(AmqpProtocolVersion.v0_10);
-        }
+        BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator();
+        ConfigurationEntryStore store = storeCreator.createStore(storeLocation, storeType,
+                options.getInitialConfigurationStoreLocation(), options.getInitialConfigurationStoreLocation());
 
-        if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
+        if (options.isManagementMode())
         {
-            supported.remove(AmqpProtocolVersion.v0_9_1);
+            store = new ManagementModeStoreHandler(store, options);
         }
 
-        if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
+        _applicationRegistry = new ApplicationRegistry(store);
+        try
         {
-            supported.remove(AmqpProtocolVersion.v0_9);
+            _applicationRegistry.initialise();
         }
-
-        if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
+        catch(Exception e)
         {
-            supported.remove(AmqpProtocolVersion.v0_8);
+            try
+            {
+                _applicationRegistry.close();
+            }
+            catch(Exception ce)
+            {
+                LOGGER.debug("An error occured when closing the registry following initialization failure", ce);
+            }
+            throw e;
         }
 
-        return supported;
     }
 
+
     private File getConfigFile(final String fileName,
                                final String defaultFileName,
                                final String qpidHome, boolean throwOnFileNotFound) throws InitException
@@ -368,11 +174,11 @@ public class Broker
 
         if (!configFile.exists() && throwOnFileNotFound)
         {
-            String error = "File " + fileName + " could not be found. Check the file exists and is readable.";
+            String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
 
             if (qpidHome == null)
             {
-                error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set.";
+                error = error + "\nNote: " + BrokerProperties.PROPERTY_QPID_HOME + " is not set.";
             }
 
             throw new InitException(error, null);
@@ -399,37 +205,6 @@ public class Broker
         }
     }
 
-    /**
-     * Update the configuration data with the management port.
-     * @param configuration
-     * @param registryServerPort The string from the command line
-     */
-    private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort)
-    {
-        if (registryServerPort != null)
-        {
-            try
-            {
-                configuration.setJMXPortRegistryServer(registryServerPort);
-            }
-            catch (NumberFormatException e)
-            {
-                throw new InitException("Invalid management (registry server) port: " + registryServerPort, null);
-            }
-        }
-        if (connectorServerPort != null)
-        {
-            try
-            {
-                configuration.setJMXPortConnectorServer(connectorServerPort);
-            }
-            catch (NumberFormatException e)
-            {
-                throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null);
-            }
-        }
-    }
-
     private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
     {
         if (logConfigFile.exists() && logConfigFile.canRead())
@@ -443,7 +218,7 @@ public class Broker
                 // log4j expects the watch interval in milliseconds
                 try
                 {
-                    LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+                    LoggingManagementFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
                 }
                 catch (Exception e)
                 {
@@ -454,7 +229,7 @@ public class Broker
             {
                 try
                 {
-                    LoggingFacade.configure(logConfigFile.getPath());
+                    LoggingManagementFacade.configure(logConfigFile.getPath());
                 }
                 catch (Exception e)
                 {
@@ -531,6 +306,24 @@ public class Broker
             LOGGER.debug("Skipping shutdown hook removal as there either isnt one, or we are it.");
         }
     }
+    /**
+     * Workaround that prevents AMQShortStrings cache from being left in the thread local. This is important
+     * when embedding the Broker in containers where the starting thread may not belong to Qpid.
+     * The long term solution here is to stop our use of AMQShortString outside the AMQP transport layer.
+     */
+    private void clearAMQShortStringCache()
+    {
+        AMQShortString.clearLocalCache();
+    }
+
+    public org.apache.qpid.server.model.Broker getBroker()
+    {
+        if (_applicationRegistry == null)
+        {
+            return null;
+        }
+        return _applicationRegistry.getBroker();
+    }
 
     private class ShutdownService implements Runnable
     {
@@ -540,4 +333,5 @@ public class Broker
             Broker.this.shutdown();
         }
     }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java Thu Feb 28 16:14:30 2013
@@ -20,66 +20,25 @@
  */
 package org.apache.qpid.server;
 
-import org.osgi.framework.BundleContext;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 public class BrokerOptions
 {
-    public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+    public static final String DEFAULT_STORE_TYPE = "json";
+    public static final String DEFAULT_CONFIG_FILE = "config";
     public static final String DEFAULT_LOG_CONFIG_FILE = "etc/log4j.xml";
-    public static final String QPID_HOME = "QPID_HOME";
-    public static final String QPID_WORK = "QPID_WORK";
-
-    private final Set<Integer> _ports = new HashSet<Integer>();
-    private final Set<Integer> _sslPorts = new HashSet<Integer>();
-    private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>();
-    private final Map<ProtocolInclusion,Set<Integer>> _inclusionMap = new HashMap<ProtocolInclusion, Set<Integer>>();
 
-    private String _configFile;
     private String _logConfigFile;
-    private String _bind;
-    private Integer _jmxPortRegistryServer;
-    private Integer _jmxPortConnectorServer;
-    private BundleContext _bundleContext;
-
     private Integer _logWatchFrequency = 0;
-    private String _qpidWorkFolder;
-    private String _qpidHomeFolder;
 
-    public void addPort(final int port)
-    {
-        _ports.add(port);
-    }
+    private String _configurationStoreLocation;
+    private String _configurationStoreType = DEFAULT_STORE_TYPE;
 
-    public void addSSLPort(final int sslPort)
-    {
-        _sslPorts.add(sslPort);
-    }
+    private String _initialConfigurationStoreLocation;
+    private String _initialConfigurationStoreType = DEFAULT_STORE_TYPE;
 
-    public Set<Integer> getPorts()
-    {
-        return Collections.unmodifiableSet(_ports);
-    }
-
-    public Set<Integer> getSSLPorts()
-    {
-        return Collections.unmodifiableSet(_sslPorts);
-    }
-
-    public String getConfigFile()
-    {
-        return _configFile;
-    }
-
-    public void setConfigFile(final String configFile)
-    {
-        _configFile = configFile;
-    }
+    private boolean _managementMode;
+    private int _managementModeRmiPort;
+    private int _managementModeConnectorPort;
+    private int _managementModeHttpPort;
 
     public String getLogConfigFile()
     {
@@ -91,110 +50,97 @@ public class BrokerOptions
         _logConfigFile = logConfigFile;
     }
 
-    public Integer getJmxPortRegistryServer()
+    public int getLogWatchFrequency()
     {
-        return _jmxPortRegistryServer;
+        return _logWatchFrequency;
     }
 
-    public void setJmxPortRegistryServer(final int jmxPortRegistryServer)
+    /**
+     * Set the frequency with which the log config file will be checked for updates.
+     * @param logWatchFrequency frequency in seconds
+     */
+    public void setLogWatchFrequency(final int logWatchFrequency)
     {
-        _jmxPortRegistryServer = jmxPortRegistryServer;
+        _logWatchFrequency = logWatchFrequency;
     }
 
-    public Integer getJmxPortConnectorServer()
+    public String getConfigurationStoreLocation()
     {
-        return _jmxPortConnectorServer;
+        return _configurationStoreLocation;
     }
 
-    public void setJmxPortConnectorServer(final int jmxPortConnectorServer)
+    public void setConfigurationStoreLocation(String cofigurationStore)
     {
-        _jmxPortConnectorServer = jmxPortConnectorServer;
+        _configurationStoreLocation = cofigurationStore;
     }
-    public String getQpidHome()
+
+    public String getConfigurationStoreType()
     {
-        return _qpidHomeFolder == null? System.getProperty(QPID_HOME): _qpidHomeFolder;
+        return _configurationStoreType;
     }
 
-    public Set<Integer> getExcludedPorts(final ProtocolExclusion excludeProtocol)
+    public void setConfigurationStoreType(String cofigurationStoreType)
     {
-        final Set<Integer> excludedPorts = _exclusionMap.get(excludeProtocol);
-        return excludedPorts == null ? Collections.<Integer>emptySet() : excludedPorts;
+        _configurationStoreType = cofigurationStoreType;
     }
 
-    public void addExcludedPort(final ProtocolExclusion excludeProtocol, final int port)
+    public void setInitialConfigurationStoreLocation(String initialConfigurationStore)
     {
-        if (!_exclusionMap.containsKey(excludeProtocol))
-        {
-            _exclusionMap.put(excludeProtocol, new HashSet<Integer>());
-        }
-
-        Set<Integer> ports = _exclusionMap.get(excludeProtocol);
-        ports.add(port);
+        _initialConfigurationStoreLocation = initialConfigurationStore;
     }
 
-    public String getBind()
+    public void setInitialConfigurationStoreType(String initialConfigurationStoreType)
     {
-        return _bind;
+        _initialConfigurationStoreType = initialConfigurationStoreType;
     }
 
-    public void setBind(final String bind)
+    public String getInitialConfigurationStoreLocation()
     {
-        _bind = bind;
+        return _initialConfigurationStoreLocation;
     }
 
-    public int getLogWatchFrequency()
+    public String getInitialConfigurationStoreType()
     {
-        return _logWatchFrequency;
+        return _initialConfigurationStoreType;
     }
 
-    /**
-     * Set the frequency with which the log config file will be checked for updates.
-     * @param logWatchFrequency frequency in seconds
-     */
-    public void setLogWatchFrequency(final int logWatchFrequency)
+    public boolean isManagementMode()
     {
-        _logWatchFrequency = logWatchFrequency;
+        return _managementMode;
     }
 
-    public BundleContext getBundleContext()
+    public void setManagementMode(boolean managementMode)
     {
-        return _bundleContext ;
+        _managementMode = managementMode;
     }
 
-    public void setBundleContext(final BundleContext bundleContext)
+    public int getManagementModeRmiPort()
     {
-        _bundleContext = bundleContext;
+        return _managementModeRmiPort;
     }
 
-    public Set<Integer> getIncludedPorts(final ProtocolInclusion includeProtocol)
+    public void setManagementModeRmiPort(int managementModeRmiPort)
     {
-        final Set<Integer> includedPorts = _inclusionMap.get(includeProtocol);
-        return includedPorts == null ? Collections.<Integer>emptySet() : includedPorts;
+        _managementModeRmiPort = managementModeRmiPort;
     }
 
-    public void addIncludedPort(final ProtocolInclusion includeProtocol, final int port)
+    public int getManagementModeConnectorPort()
     {
-        if (!_inclusionMap.containsKey(includeProtocol))
-        {
-            _inclusionMap.put(includeProtocol, new HashSet<Integer>());
-        }
-
-        Set<Integer> ports = _inclusionMap.get(includeProtocol);
-        ports.add(port);
+        return _managementModeConnectorPort;
     }
 
-    public String getQpidWork()
+    public void setManagementModeConnectorPort(int managementModeConnectorPort)
     {
-        return _qpidWorkFolder;
+        _managementModeConnectorPort = managementModeConnectorPort;
     }
 
-    public void setQpidWork(String qpidWorkFolder)
+    public int getManagementModeHttpPort()
     {
-        _qpidWorkFolder = qpidWorkFolder;
+        return _managementModeHttpPort;
     }
 
-    public void setQpidHome(String qpidHomeFolder)
+    public void setManagementModeHttpPort(int managementModeHttpPort)
     {
-        _qpidHomeFolder = qpidHomeFolder;
+        _managementModeHttpPort = managementModeHttpPort;
     }
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Feb 28 16:14:30 2013
@@ -30,9 +30,6 @@ import org.apache.commons.cli.PosixParse
 import org.apache.log4j.Logger;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.Broker.InitException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
 
 /**
  * Main entry point for AMQPD.
@@ -45,86 +42,17 @@ public class Main
 
     private static final Option OPTION_VERSION = new Option("v", "version", false, "print the version information and exit");
 
-    private static final Option OPTION_CONFIG_FILE =
-            OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
-                    .create("c");
-
-    private static final Option OPTION_PORT =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified port. Overrides any value in the config file")
-                    .withLongOpt("port").create("p");
-
-    private static final Option OPTION_SSLPORT =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("SSL port. Overrides any value in the config file")
-                    .withLongOpt("sslport").create("s");
-
-
-    private static final Option OPTION_EXCLUDE_1_0 =
-            OptionBuilder.withArgName("port").hasArg()
-                         .withDescription("when listening on the specified port do not accept AMQP1-0 connections. The specified port must be one specified on the command line")
-                         .withLongOpt("exclude-1-0").create();
-
-    private static final Option OPTION_EXCLUDE_0_10 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-10").create();
-
-    private static final Option OPTION_EXCLUDE_0_9_1 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-9-1").create();
-
-    private static final Option OPTION_EXCLUDE_0_9 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-9").create();
-
-    private static final Option OPTION_EXCLUDE_0_8 =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
-                    .withLongOpt("exclude-0-8").create();
-
-    private static final Option OPTION_INCLUDE_1_0 =
-        OptionBuilder.withArgName("port").hasArg()
-                .withDescription("accept AMQP1-0 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
-                     .withLongOpt("include-1-0").create();
-
-private static final Option OPTION_INCLUDE_0_10 =
-        OptionBuilder.withArgName("port").hasArg()
-                .withDescription("accept AMQP0-10 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
-                .withLongOpt("include-0-10").create();
-
-private static final Option OPTION_INCLUDE_0_9_1 =
-        OptionBuilder.withArgName("port").hasArg()
-                .withDescription("accept AMQP0-9-1 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
-                .withLongOpt("include-0-9-1").create();
-
-private static final Option OPTION_INCLUDE_0_9 =
-        OptionBuilder.withArgName("port").hasArg()
-                .withDescription("accept AMQP0-9 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
-                .withLongOpt("include-0-9").create();
-
-private static final Option OPTION_INCLUDE_0_8 =
-        OptionBuilder.withArgName("port").hasArg()
-                .withDescription("accept AMQP0-8 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
-                .withLongOpt("include-0-8").create();
-
-
-    private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
-                    .withLongOpt("jmxregistryport").create("m");
-
-    private static final Option OPTION_JMX_PORT_CONNECTOR_SERVER =
-            OptionBuilder.withArgName("port").hasArg()
-                    .withDescription("listen on the specified management (connector server) port. Overrides any value in the config file")
-                    .withLongOpt("jmxconnectorport").create();
-
-    private static final Option OPTION_BIND =
-            OptionBuilder.withArgName("address").hasArg()
-                    .withDescription("bind to the specified address. Overrides any value in the config file")
-                    .withLongOpt("bind").create("b");
+    private static final Option OPTION_CONFIGURATION_STORE_PATH = OptionBuilder.withArgName("path").hasArg()
+            .withDescription("use given configuration store location").withLongOpt("store-path").create("sp");
+
+    private static final Option OPTION_CONFIGURATION_STORE_TYPE = OptionBuilder.withArgName("type").hasArg()
+            .withDescription("use given store type").withLongOpt("store-type").create("st");
+
+    private static final Option OPTION_INITIAL_CONFIGURATION_STORE_PATH = OptionBuilder.withArgName("path").hasArg()
+            .withDescription("pass the location of initial store to use to create a user store").withLongOpt("initial-store-path").create("isp");
+
+    private static final Option OPTION_INITIAL_CONFIGURATION_STORE_TYPE = OptionBuilder.withArgName("type").hasArg()
+            .withDescription("the type of initial store").withLongOpt("initial-store-type").create("ist");
 
     private static final Option OPTION_LOG_CONFIG_FILE =
             OptionBuilder.withArgName("file").hasArg()
@@ -137,31 +65,31 @@ private static final Option OPTION_INCLU
                     .withDescription("monitor the log file configuration file for changes. Units are seconds. "
                                      + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
 
+    private static final Option OPTION_MANAGEMENT_MODE = OptionBuilder.withDescription("start broker in a management mode")
+            .withLongOpt("management-mode").create("mm");
+    private static final Option OPTION_RMI_PORT = OptionBuilder.withArgName("port").hasArg()
+            .withDescription("override jmx rmi port in management mode").withLongOpt("jmxregistryport").create("rmi");
+    private static final Option OPTION_CONNECTOR_PORT = OptionBuilder.withArgName("port").hasArg()
+            .withDescription("override jmx connector port in management mode").withLongOpt("jmxconnectorport").create("jmxrmi");
+    private static final Option OPTION_HTTP_PORT = OptionBuilder.withArgName("port").hasArg()
+            .withDescription("override web management port in management mode").withLongOpt("httpport").create("http");
+
     private static final Options OPTIONS = new Options();
 
     static
     {
         OPTIONS.addOption(OPTION_HELP);
         OPTIONS.addOption(OPTION_VERSION);
-        OPTIONS.addOption(OPTION_CONFIG_FILE);
+        OPTIONS.addOption(OPTION_CONFIGURATION_STORE_PATH);
+        OPTIONS.addOption(OPTION_CONFIGURATION_STORE_TYPE);
         OPTIONS.addOption(OPTION_LOG_CONFIG_FILE);
         OPTIONS.addOption(OPTION_LOG_WATCH);
-        OPTIONS.addOption(OPTION_PORT);
-        OPTIONS.addOption(OPTION_SSLPORT);
-        OPTIONS.addOption(OPTION_EXCLUDE_1_0);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_10);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_9);
-        OPTIONS.addOption(OPTION_EXCLUDE_0_8);
-        OPTIONS.addOption(OPTION_INCLUDE_1_0);
-        OPTIONS.addOption(OPTION_INCLUDE_0_10);
-        OPTIONS.addOption(OPTION_INCLUDE_0_9_1);
-        OPTIONS.addOption(OPTION_INCLUDE_0_9);
-        OPTIONS.addOption(OPTION_INCLUDE_0_8);
-        OPTIONS.addOption(OPTION_BIND);
-
-        OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
-        OPTIONS.addOption(OPTION_JMX_PORT_CONNECTOR_SERVER);
+        OPTIONS.addOption(OPTION_INITIAL_CONFIGURATION_STORE_PATH);
+        OPTIONS.addOption(OPTION_INITIAL_CONFIGURATION_STORE_TYPE);
+        OPTIONS.addOption(OPTION_MANAGEMENT_MODE);
+        OPTIONS.addOption(OPTION_RMI_PORT);
+        OPTIONS.addOption(OPTION_CONNECTOR_PORT);
+        OPTIONS.addOption(OPTION_HTTP_PORT);
     }
 
     protected CommandLine _commandLine;
@@ -243,10 +171,15 @@ private static final Option OPTION_INCLU
         else
         {
             BrokerOptions options = new BrokerOptions();
-            String configFile = _commandLine.getOptionValue(OPTION_CONFIG_FILE.getOpt());
-            if(configFile != null)
+            String configurationStore = _commandLine.getOptionValue(OPTION_CONFIGURATION_STORE_PATH.getOpt());
+            if (configurationStore != null)
+            {
+                options.setConfigurationStoreLocation(configurationStore);
+            }
+            String configurationStoreType = _commandLine.getOptionValue(OPTION_CONFIGURATION_STORE_TYPE.getOpt());
+            if (configurationStoreType != null)
             {
-                options.setConfigFile(configFile);
+                options.setConfigurationStoreType(configurationStoreType);
             }
 
             String logWatchConfig = _commandLine.getOptionValue(OPTION_LOG_WATCH.getOpt());
@@ -261,52 +194,37 @@ private static final Option OPTION_INCLU
                 options.setLogConfigFile(logConfig);
             }
 
-            String jmxPortRegistryServer = _commandLine.getOptionValue(OPTION_JMX_PORT_REGISTRY_SERVER.getOpt());
-            if(jmxPortRegistryServer != null)
+            String initialConfigurationStore = _commandLine.getOptionValue(OPTION_INITIAL_CONFIGURATION_STORE_PATH.getOpt());
+            if (initialConfigurationStore != null)
             {
-                options.setJmxPortRegistryServer(Integer.parseInt(jmxPortRegistryServer));
+                options.setInitialConfigurationStoreLocation(initialConfigurationStore);
             }
-
-            String jmxPortConnectorServer = _commandLine.getOptionValue(OPTION_JMX_PORT_CONNECTOR_SERVER.getLongOpt());
-            if(jmxPortConnectorServer != null)
+            String initailConfigurationStoreType = _commandLine.getOptionValue(OPTION_INITIAL_CONFIGURATION_STORE_TYPE.getOpt());
+            if (initailConfigurationStoreType != null)
             {
-                options.setJmxPortConnectorServer(Integer.parseInt(jmxPortConnectorServer));
+                options.setInitialConfigurationStoreType(initailConfigurationStoreType);
             }
 
-            String bindAddr = _commandLine.getOptionValue(OPTION_BIND.getOpt());
-            if (bindAddr != null)
+            boolean managmentMode = _commandLine.hasOption(OPTION_MANAGEMENT_MODE.getOpt());
+            if (managmentMode)
             {
-                options.setBind(bindAddr);
-            }
-
-            String[] portStr = _commandLine.getOptionValues(OPTION_PORT.getOpt());
-            if(portStr != null)
-            {
-                parsePortArray(options, portStr, false);
-                for(ProtocolExclusion pe : ProtocolExclusion.values())
+                options.setManagementMode(true);
+                String rmiPort = _commandLine.getOptionValue(OPTION_RMI_PORT.getOpt());
+                if (rmiPort != null)
                 {
-                    parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
+                    options.setManagementModeRmiPort(Integer.parseInt(rmiPort));
                 }
-                for(ProtocolInclusion pe : ProtocolInclusion.values())
+                String connectorPort = _commandLine.getOptionValue(OPTION_CONNECTOR_PORT.getOpt());
+                if (connectorPort != null)
                 {
-                    parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+                    options.setManagementModeConnectorPort(Integer.parseInt(connectorPort));
                 }
-            }
-
-            String[] sslPortStr = _commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
-            if(sslPortStr != null)
-            {
-                parsePortArray(options, sslPortStr, true);
-                for(ProtocolExclusion pe : ProtocolExclusion.values())
-                {
-                    parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
-                }
-                for(ProtocolInclusion pe : ProtocolInclusion.values())
+                String httpPort = _commandLine.getOptionValue(OPTION_HTTP_PORT.getOpt());
+                if (httpPort != null)
                 {
-                    parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+                    options.setManagementModeHttpPort(Integer.parseInt(httpPort));
                 }
             }
-
             setExceptionHandler();
 
             startBroker(options);
@@ -389,72 +307,7 @@ private static final Option OPTION_INCLU
 
     protected void shutdown(final int status)
     {
-        ApplicationRegistry.remove();
         System.exit(status);
     }
 
-    private static void parsePortArray(final BrokerOptions options,final Object[] ports,
-                                       final boolean ssl) throws InitException
-    {
-        if(ports != null)
-        {
-            for(int i = 0; i < ports.length; i++)
-            {
-                try
-                {
-                    if(ssl)
-                    {
-                        options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
-                    else
-                    {
-                        options.addPort(Integer.parseInt(String.valueOf(ports[i])));
-                    }
-                }
-                catch (NumberFormatException e)
-                {
-                    throw new InitException("Invalid port: " + ports[i], e);
-                }
-            }
-        }
-    }
-
-    private static void parsePortArray(final BrokerOptions options, final Object[] ports,
-                                       final ProtocolExclusion excludedProtocol) throws InitException
-    {
-        if(ports != null)
-        {
-            for(int i = 0; i < ports.length; i++)
-            {
-                try
-                {
-                    options.addExcludedPort(excludedProtocol, 
-                            Integer.parseInt(String.valueOf(ports[i])));
-                }
-                catch (NumberFormatException e)
-                {
-                    throw new InitException("Invalid port for exclusion: " + ports[i], e);
-                }
-            }
-        }
-    }
-
-    private static void parseProtocolInclusions(final BrokerOptions options, final Object[] ports,
-                                       final ProtocolInclusion includedProtocol) throws InitException
-    {
-        if(ports != null)
-        {
-            for(int i = 0; i < ports.length; i++)
-            {
-                try
-                {
-                    options.addIncludedPort(includedProtocol, Integer.parseInt(String.valueOf(ports[i])));
-                }
-                catch (NumberFormatException e)
-                {
-                    throw new InitException("Invalid port for inclusion: " + ports[i], e);
-                }
-            }
-        }
-    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java Thu Feb 28 16:14:30 2013
@@ -18,46 +18,85 @@
  */
 package org.apache.qpid.server;
 
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.txn.ServerTransaction;
 
 public class TransactionTimeoutHelper
 {
-    private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class);
-
-    public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT";
-    public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT";
+    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
+    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
 
     private final LogSubject _logSubject;
 
-    public TransactionTimeoutHelper(final LogSubject logSubject)
+    private final CloseAction _closeAction;
+
+    public TransactionTimeoutHelper(final LogSubject logSubject, final CloseAction closeAction)
     {
         _logSubject = logSubject;
+        _closeAction = closeAction;
     }
 
-    public void logIfNecessary(final long timeSoFar, final long warnTimeout,
-                               final LogMessage message, final String alternateLogPrefix)
+    public void checkIdleOrOpenTimes(ServerTransaction transaction, long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        if (isTimedOut(timeSoFar, warnTimeout))
+        if (transaction.isTransactional())
         {
-            LogActor logActor = CurrentActor.get();
-            if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy()))
+            final long transactionUpdateTime = transaction.getTransactionUpdateTime();
+            if(transactionUpdateTime > 0)
             {
-                logActor.message(_logSubject, message);
+                long idleTime = System.currentTimeMillis() - transactionUpdateTime;
+                boolean closed = logAndCloseIfNecessary(idleTime, idleWarn, idleClose, ChannelMessages.IDLE_TXN(idleTime), IDLE_TRANSACTION_TIMEOUT_ERROR);
+                if (closed)
+                {
+                    return; // no point proceeding to check the open time
+                }
             }
-            else
+
+            final long transactionStartTime = transaction.getTransactionStartTime();
+            if(transactionStartTime > 0)
             {
-                LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms");
+                long openTime = System.currentTimeMillis() - transactionStartTime;
+                logAndCloseIfNecessary(openTime, openWarn, openClose, ChannelMessages.OPEN_TXN(openTime), OPEN_TRANSACTION_TIMEOUT_ERROR);
             }
         }
     }
 
-    public boolean isTimedOut(long timeSoFar, long timeout)
+    /**
+     * @return true iff closeTimeout was exceeded
+     */
+    private boolean logAndCloseIfNecessary(final long timeSoFar,
+            final long warnTimeout, final long closeTimeout,
+            final LogMessage warnMessage, final String closeMessage) throws AMQException
+    {
+        if (isTimedOut(timeSoFar, warnTimeout))
+        {
+            LogActor logActor = CurrentActor.get();
+            logActor.message(_logSubject, warnMessage);
+        }
+
+        if(isTimedOut(timeSoFar, closeTimeout))
+        {
+            _closeAction.doTimeoutAction(closeMessage);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean isTimedOut(long timeSoFar, long timeout)
     {
         return timeout > 0L && timeSoFar > timeout;
     }
+
+    public interface CloseAction
+    {
+        void doTimeoutAction(String reason) throws AMQException;
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Thu Feb 28 16:14:30 2013
@@ -35,13 +35,15 @@ public class Binding
     private final Exchange _exchange;
     private final Map<String, Object> _arguments;
     private final UUID _id;
-    private final UUID _qmfId;
     private final AtomicLong _matches = new AtomicLong();
 
-    public Binding(UUID id, UUID qmfId, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+    public Binding(UUID id,
+                   final String bindingKey,
+                   final AMQQueue queue,
+                   final Exchange exchange,
+                   final Map<String, Object> arguments)
     {
         _id = id;
-        _qmfId = qmfId;
         _bindingKey = bindingKey;
         _queue = queue;
         _exchange = exchange;
@@ -53,11 +55,6 @@ public class Binding
         return _id;
     }
 
-    public UUID getQMFId()
-    {
-        return _qmfId;
-    }
-
     public String getBindingKey()
     {
         return _bindingKey;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Thu Feb 28 16:14:30 2013
@@ -24,10 +24,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.BindingConfig;
-import org.apache.qpid.server.configuration.BindingConfigType;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -52,7 +48,7 @@ public class BindingFactory
         _virtualHost = vhost;
     }
 
-    private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
+    private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task
     {
         private final BindingLogSubject _logSubject;
         //TODO : persist creation time
@@ -60,7 +56,7 @@ public class BindingFactory
 
         private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
         {
-            super(id, queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
+            super(id, bindingKey, queue, exchange, arguments);
             _logSubject = new BindingLogSubject(bindingKey,exchange,queue);
 
         }
@@ -96,16 +92,6 @@ public class BindingFactory
             return _createTime;
         }
 
-        public BindingConfigType getConfigType()
-        {
-            return BindingConfigType.getInstance();
-        }
-
-        public ConfiguredObject getParent()
-        {
-            return _virtualHost;
-        }
-
         public boolean isDurable()
         {
             return getQueue().isDurable() && getExchange().isDurable();
@@ -186,7 +172,6 @@ public class BindingFactory
             exchange.addCloseTask(b);
             queue.addBinding(b);
             exchange.addBinding(b);
-            getConfigStore().addConfiguredObject(b);
             b.logCreation();
 
             return true;
@@ -197,11 +182,6 @@ public class BindingFactory
         }
     }
 
-    private ConfigStore getConfigStore()
-    {
-        return _virtualHost.getConfigStore();
-    }
-
     public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
     {
         makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false);
@@ -257,7 +237,6 @@ public class BindingFactory
                 _virtualHost.getMessageStore().unbindQueue(b);
             }
             b.logDestruction();
-            getConfigStore().removeConfiguredObject(b);
         }
 
         return b;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Feb 28 16:14:30 2013
@@ -24,11 +24,11 @@ import org.apache.commons.configuration.
 import org.apache.commons.configuration.ConfigurationException;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
 
 import java.util.List;
 
-public class QueueConfiguration extends ConfigurationPlugin
+public class QueueConfiguration extends AbstractConfiguration
 {
     private String _name;
     private VirtualHostConfiguration _vHostConfig;
@@ -39,7 +39,7 @@ public class QueueConfiguration extends 
         _name = name;
 
         CompositeConfiguration mungedConf = new CompositeConfiguration();
-        mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + name));
+        mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name)));
         mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues"));
 
         setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf);
@@ -193,43 +193,4 @@ public class QueueConfiguration extends 
     {
         return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
     }
-
-    public static class QueueConfig extends ConfigurationPlugin
-    {
-        @Override
-        public String[] getElementsProcessed()
-        {
-            return new String[]{"name"};
-        }
-
-        public String getName()
-        {
-            return getStringValue("name");
-        }
-
-
-        public void validateConfiguration() throws ConfigurationException
-        {
-            if (getConfig().isEmpty())
-            {
-                throw new ConfigurationException("Queue section cannot be empty.");
-            }
-
-            if (getName() == null)
-            {
-                throw new ConfigurationException("Queue section must have a 'name' element.");
-            }
-
-        }
-
-
-        @Override
-        public String formatToString()
-        {
-            return "Name:"+getName();
-        }
-          
-
-    }
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org