You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [30/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/rmi/RMIPasswordAuthenticator.java Thu Feb 28 16:14:30 2013
@@ -22,13 +22,13 @@ package org.apache.qpid.server.security.
 
 import java.net.SocketAddress;
 
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 
 import javax.management.remote.JMXAuthenticator;
-import javax.management.remote.JMXPrincipal;
 import javax.security.auth.Subject;
 
 public class RMIPasswordAuthenticator implements JMXAuthenticator
@@ -38,23 +38,33 @@ public class RMIPasswordAuthenticator im
     static final String SHOULD_HAVE_2_ELEMENTS = "User details should have 2 elements, username, password";
     static final String SHOULD_BE_NON_NULL = "Supplied username and password should be non-null";
     static final String INVALID_CREDENTIALS = "Invalid user details supplied";
+    static final String USER_NOT_AUTHORISED_FOR_MANAGEMENT = "User not authorised for management";
     static final String CREDENTIALS_REQUIRED = "User details are required. " +
-    		            "Please ensure you are using an up to date management console to connect.";
+                        "Please ensure you are using an up to date management console to connect.";
 
-    private AuthenticationManager _authenticationManager = null;
-    private SocketAddress _socketAddress;
+    private final Broker _broker;
+    private final SocketAddress _address;
 
-    public RMIPasswordAuthenticator(SocketAddress socketAddress)
+    public RMIPasswordAuthenticator(Broker broker, SocketAddress address)
     {
-        _socketAddress = socketAddress;
+        _broker = broker;
+        _address = address;
     }
 
-    public void setAuthenticationManager(final AuthenticationManager authenticationManager)
+    public Subject authenticate(Object credentials) throws SecurityException
     {
-        _authenticationManager = authenticationManager;
+        validateCredentials(credentials);
+
+        final String[] userCredentials = (String[]) credentials;
+        final String username = (String) userCredentials[0];
+        final String password = (String) userCredentials[1];
+
+        final Subject authenticatedSubject = doAuthentication(username, password);
+        doManagementAuthorisation(authenticatedSubject);
+        return authenticatedSubject;
     }
 
-    public Subject authenticate(Object credentials) throws SecurityException
+    private void validateCredentials(Object credentials)
     {
         // Verify that credential's are of type String[].
         if (!(credentials instanceof String[]))
@@ -70,41 +80,27 @@ public class RMIPasswordAuthenticator im
         }
 
         // Verify that required number of credentials.
-        final String[] userCredentials = (String[]) credentials;
-        if (userCredentials.length != 2)
+        if (((String[])credentials).length != 2)
         {
             throw new SecurityException(SHOULD_HAVE_2_ELEMENTS);
         }
+    }
 
-        final String username = (String) userCredentials[0];
-        final String password = (String) userCredentials[1];
-
+    private Subject doAuthentication(final String username, final String password)
+    {
         // Verify that all required credentials are actually present.
         if (username == null || password == null)
         {
             throw new SecurityException(SHOULD_BE_NON_NULL);
         }
 
-        // Verify that an AuthenticationManager has been set.
-        if (_authenticationManager == null)
+        SubjectCreator subjectCreator = _broker.getSubjectCreator(_address);
+        if (subjectCreator == null)
         {
-            try
-            {
-                if(ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress) != null)
-                {
-                    _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager(_socketAddress);
-                }
-                else
-                {
-                    throw new SecurityException(UNABLE_TO_LOOKUP);
-                }
-            }
-            catch(IllegalStateException e)
-            {
-                throw new SecurityException(UNABLE_TO_LOOKUP);
-            }
+            throw new SecurityException("Can't get subject creator for " + _address);
         }
-        final AuthenticationResult result = _authenticationManager.authenticate(username, password);
+
+        final SubjectAuthenticationResult result = subjectCreator.authenticate(username, password);
 
         if (AuthenticationStatus.ERROR.equals(result.getStatus()))
         {
@@ -112,10 +108,7 @@ public class RMIPasswordAuthenticator im
         }
         else if (AuthenticationStatus.SUCCESS.equals(result.getStatus()))
         {
-            final Subject subject = result.getSubject();
-            subject.getPrincipals().add(new JMXPrincipal(username));
-            subject.setReadOnly();
-            return subject;
+            return result.getSubject();
         }
         else
         {
@@ -123,4 +116,21 @@ public class RMIPasswordAuthenticator im
         }
     }
 
+    private void doManagementAuthorisation(Subject authenticatedSubject)
+    {
+        SecurityManager.setThreadSubject(authenticatedSubject);
+        try
+        {
+            if (!_broker.getSecurityManager().accessManagement())
+            {
+                throw new SecurityException(USER_NOT_AUTHORISED_FOR_MANAGEMENT);
+            }
+        }
+        finally
+        {
+            SecurityManager.setThreadSubject(null);
+        }
+    }
+
+
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/UsernamePasswordInitialiser.java Thu Feb 28 16:14:30 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.server.security.
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
 
 import javax.security.auth.callback.Callback;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServer.java Thu Feb 28 16:14:30 2013
@@ -23,6 +23,8 @@ package org.apache.qpid.server.security.
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager;
+
 
 public class AnonymousSaslServer implements SaslServer
 {
@@ -52,7 +54,7 @@ public class AnonymousSaslServer impleme
 
     public String getAuthorizationID()
     {
-        return null;
+        return AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL.getName();
     }
 
     public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/crammd5/CRAMMD5HexInitialiser.java Thu Feb 28 16:14:30 2013
@@ -139,6 +139,12 @@ public class CRAMMD5HexInitialiser exten
         {
             _realPricipalDatabase.reload();
         }
+
+        @Override
+        public void setPasswordFile(String passwordFile) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Thu Feb 28 16:14:30 2013
@@ -31,10 +31,10 @@ import org.apache.qpid.framing.MethodDis
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -47,32 +47,29 @@ public class AMQStateManager implements 
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
 
-    private final VirtualHostRegistry _virtualHostRegistry;
+    private final Broker _broker;
     private final AMQProtocolSession _protocolSession;
     /** The current state */
     private AMQState _currentState;
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
-    public AMQStateManager(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
+    public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
     {
-
-        _virtualHostRegistry = virtualHostRegistry;
+        _broker = broker;
         _protocolSession = protocolSession;
         _currentState = AMQState.CONNECTION_NOT_STARTED;
 
     }
 
     /**
-     * Get the ApplicationRegistry associated with this AMQStateManager
-     *
-     * returns the application registry associated with the VirtualHostRegistry of the AMQStateManager
+     * Get the Broker instance
      *
-     * @return the ApplicationRegistry
+     * @return the Broker
      */
-    public IApplicationRegistry getApplicationRegistry()
+    public Broker getBroker()
     {
-        return _virtualHostRegistry.getApplicationRegistry();
+        return _broker;
     }
 
     public AMQState getCurrentState()
@@ -148,7 +145,7 @@ public class AMQStateManager implements 
 
     public VirtualHostRegistry getVirtualHostRegistry()
     {
-        return _virtualHostRegistry;
+        return _broker.getVirtualHostRegistry();
     }
 
     public AMQProtocolSession getProtocolSession()
@@ -157,13 +154,9 @@ public class AMQStateManager implements 
         return _protocolSession;
     }
 
-    /**
-     * Get the AuthenticationManager associated with the ProtocolSession of the AMQStateManager
-     *
-     * @return the AuthenticationManager
-     */
-    public AuthenticationManager getAuthenticationManager()
+    
+    public SubjectCreator getSubjectCreator()
     {
-        return getApplicationRegistry().getAuthenticationManager(getProtocolSession().getLocalAddress());
+        return _broker.getSubjectCreator(getProtocolSession().getLocalAddress());
     }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Thu Feb 28 16:14:30 2013
@@ -46,19 +46,7 @@ public interface ConfigurationRecoveryHa
     public static interface BindingRecoveryHandler
     {
         void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
-        BrokerLinkRecoveryHandler completeBindingRecovery();
-    }
-    
-    public static interface BrokerLinkRecoveryHandler
-    {
-        BridgeRecoveryHandler brokerLink(UUID id, long createTime, Map<String,String> arguments);
-        void completeBrokerLinkRecovery();
-    }
-    
-    public static interface BridgeRecoveryHandler
-    {
-        void bridge(UUID id, long createTime, Map<String,String> arguments);
-        void completeBridgeRecoveryForLink();
+        void completeBindingRecovery();
     }
 
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Thu Feb 28 16:14:30 2013
@@ -26,8 +26,6 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public interface DurableConfigurationStore
@@ -122,12 +120,5 @@ public interface DurableConfigurationSto
      * @throws AMQStoreException If the operation fails for any reason.
      */
     void updateQueue(AMQQueue queue) throws AMQStoreException;
-    
-    void createBrokerLink(BrokerLink link) throws AMQStoreException;
-    
-    void deleteBrokerLink(BrokerLink link) throws AMQStoreException;
-    
-    void createBridge(Bridge bridge) throws AMQStoreException;
-    
-    void deleteBridge(Bridge bridge) throws AMQStoreException;
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Thu Feb 28 16:14:30 2013
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
 /** A simple message store that stores the messages in a thread-safe structure in memory. */
 public class MemoryMessageStore extends NullMessageStore
 {
+    public static final String TYPE = "Memory";
     private final AtomicLong _messageId = new AtomicLong(1);
     private final AtomicBoolean _closed = new AtomicBoolean(false);
 
@@ -138,6 +139,6 @@ public class MemoryMessageStore extends 
     @Override
     public String getStoreType()
     {
-        return "Memory";
+        return TYPE;
     }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Thu Feb 28 16:14:30 2013
@@ -24,8 +24,6 @@ import org.apache.qpid.AMQStoreException
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public abstract class NullMessageStore implements MessageStore
@@ -78,26 +76,6 @@ public abstract class NullMessageStore i
     }
 
     @Override
-    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void createBridge(final Bridge bridge) throws AMQStoreException
-    {
-    }
-
-    @Override
-    public void deleteBridge(final Bridge bridge) throws AMQStoreException
-    {
-    }
-
-    @Override
     public void configureMessageStore(String name,
             MessageStoreRecoveryHandler recoveryHandler,
             TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java Thu Feb 28 16:14:30 2013
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.store;
 
-import org.apache.qpid.server.configuration.ConfiguredObject;
-
 public enum State
 {
     /** The initial state of the store.  In practice, the store immediately transitions to the subsequent states. */
@@ -30,7 +28,7 @@ public enum State
     INITIALISING,
     /**
      * The initial set-up of the store has completed.
-     * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+     * If the store is persistent, it has not yet loaded configuration from disk.
      *
      * From the point of view of the user, the store is essentially stopped.
      */

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Thu Feb 28 16:14:30 2013
@@ -23,7 +23,6 @@ package org.apache.qpid.server.store.der
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
@@ -41,7 +40,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -55,12 +53,9 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectHelper;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.Event;
@@ -236,7 +231,7 @@ public class DerbyMessageStore implement
 
     private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
 
-    private static final String DERBY_STORE_TYPE = "DERBY";
+    public static final String TYPE = "DERBY";
 
     private final StateManager _stateManager;
 
@@ -572,8 +567,7 @@ public class DerbyMessageStore implement
             BindingRecoveryHandler brh = qrh.completeQueueRecovery();
             _configuredObjectHelper.recoverBindings(brh, configuredObjects);
 
-            BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
-            recoverBrokerLinks(lrh);
+            brh.completeBindingRecovery();
         }
         catch (SQLException e)
         {
@@ -581,144 +575,6 @@ public class DerbyMessageStore implement
         }
     }
 
-    private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
-            throws SQLException
-    {
-        _logger.info("Recovering broker links...");
-
-        Connection conn = null;
-        try
-        {
-            conn = newAutoCommitConnection();
-
-            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
-
-            try
-            {
-                ResultSet rs = stmt.executeQuery();
-
-                try
-                {
-
-                    while(rs.next())
-                    {
-                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
-                        long createTime = rs.getLong(3);
-                        Blob argumentsAsBlob = rs.getBlob(4);
-
-                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
-                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
-                        int size = dis.readInt();
-
-                        Map<String,String> arguments = new HashMap<String, String>();
-
-                        for(int i = 0; i < size; i++)
-                        {
-                            arguments.put(dis.readUTF(), dis.readUTF());
-                        }
-
-                        ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
-
-                        recoverBridges(brh, id);
-
-                    }
-                }
-                catch (IOException e)
-                {
-                    throw new SQLException(e.getMessage(), e);
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-        }
-        finally
-        {
-            if(conn != null)
-            {
-                conn.close();
-            }
-        }
-
-    }
-
-    private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
-            throws SQLException
-    {
-        _logger.info("Recovering bridges for link " + linkId + "...");
-
-        Connection conn = null;
-        try
-        {
-            conn = newAutoCommitConnection();
-
-            PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
-
-            try
-            {
-                stmt.setLong(1, linkId.getLeastSignificantBits());
-                stmt.setLong(2, linkId.getMostSignificantBits());
-
-                ResultSet rs = stmt.executeQuery();
-
-                try
-                {
-
-                    while(rs.next())
-                    {
-                        UUID id  = new UUID(rs.getLong(2), rs.getLong(1));
-                        long createTime = rs.getLong(3);
-                        Blob argumentsAsBlob = rs.getBlob(6);
-
-                        byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
-
-                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
-                        int size = dis.readInt();
-
-                        Map<String,String> arguments = new HashMap<String, String>();
-
-                        for(int i = 0; i < size; i++)
-                        {
-                            arguments.put(dis.readUTF(), dis.readUTF());
-                        }
-
-                        brh.bridge(id, createTime, arguments);
-
-                    }
-                    brh.completeBridgeRecoveryForLink();
-                }
-                catch (IOException e)
-                {
-                    throw new SQLException(e.getMessage(), e);
-                }
-                finally
-                {
-                    rs.close();
-                }
-            }
-            finally
-            {
-                stmt.close();
-            }
-
-        }
-        finally
-        {
-            if(conn != null)
-            {
-                conn.close();
-            }
-        }
-
-    }
-
     @Override
     public void close() throws Exception
     {
@@ -975,71 +831,6 @@ public class DerbyMessageStore implement
         }
     }
 
-    @Override
-    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-        _logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
-
-        if (_stateManager.isInState(State.ACTIVE))
-        {
-            try
-            {
-                Connection conn = newAutoCommitConnection();
-
-                PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
-                try
-                {
-
-                    stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
-                    stmt.setLong(2, link.getQMFId().getMostSignificantBits());
-                    ResultSet rs = stmt.executeQuery();
-                    try
-                    {
-
-                        // If we don't have any data in the result set then we can add this queue
-                        if (!rs.next())
-                        {
-                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
-
-                            try
-                            {
-
-                                insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
-                                insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
-                                insertStmt.setLong(3, link.getCreateTime());
-
-                                byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
-                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
-                                insertStmt.setBinaryStream(4,bis,argumentBytes.length);
-
-                                insertStmt.execute();
-                            }
-                            finally
-                            {
-                                insertStmt.close();
-                            }
-                        }
-                    }
-                    finally
-                    {
-                        rs.close();
-                    }
-                }
-                finally
-                {
-                    stmt.close();
-                }
-                conn.close();
-
-            }
-            catch (SQLException e)
-            {
-                throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
-            }
-        }
-    }
-
     private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
     {
         byte[] argumentBytes;
@@ -1072,139 +863,7 @@ public class DerbyMessageStore implement
         return argumentBytes;
     }
 
-    @Override
-    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-        _logger.debug("public void deleteBrokerLink( " + link + "): called");
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        try
-        {
-            conn = newAutoCommitConnection();
-            stmt = conn.prepareStatement(DELETE_FROM_LINKS);
-            stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
-            stmt.setLong(2, link.getQMFId().getMostSignificantBits());
-            int results = stmt.executeUpdate();
-
-            if (results == 0)
-            {
-                throw new AMQStoreException("Link " + link + " not found");
-            }
-        }
-        catch (SQLException e)
-        {
-            throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
-        }
-        finally
-        {
-            closePreparedStatement(stmt);
-            closeConnection(conn);
-        }
-
-
-    }
-
-    @Override
-    public void createBridge(final Bridge bridge) throws AMQStoreException
-    {
-        _logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
 
-        if (_stateManager.isInState(State.ACTIVE))
-        {
-            try
-            {
-                Connection conn = newAutoCommitConnection();
-
-                PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
-                try
-                {
-
-                    UUID id = bridge.getQMFId();
-                    stmt.setLong(1, id.getLeastSignificantBits());
-                    stmt.setLong(2, id.getMostSignificantBits());
-                    ResultSet rs = stmt.executeQuery();
-                    try
-                    {
-
-                        // If we don't have any data in the result set then we can add this queue
-                        if (!rs.next())
-                        {
-                            PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
-
-                            try
-                            {
-
-                                insertStmt.setLong(1, id.getLeastSignificantBits());
-                                insertStmt.setLong(2, id.getMostSignificantBits());
-
-                                insertStmt.setLong(3, bridge.getCreateTime());
-
-                                UUID linkId = bridge.getLink().getQMFId();
-                                insertStmt.setLong(4, linkId.getLeastSignificantBits());
-                                insertStmt.setLong(5, linkId.getMostSignificantBits());
-
-                                byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
-                                ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
-
-                                insertStmt.setBinaryStream(6,bis,argumentBytes.length);
-
-                                insertStmt.execute();
-                            }
-                            finally
-                            {
-                                insertStmt.close();
-                            }
-                        }
-                    }
-                    finally
-                    {
-                        rs.close();
-                    }
-                }
-                finally
-                {
-                    stmt.close();
-                }
-                conn.close();
-
-            }
-            catch (SQLException e)
-            {
-                throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
-            }
-        }
-    }
-
-    @Override
-    public void deleteBridge(final Bridge bridge) throws AMQStoreException
-    {
-        _logger.debug("public void deleteBridge( " + bridge + "): called");
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        try
-        {
-            conn = newAutoCommitConnection();
-            stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
-            stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
-            stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
-            int results = stmt.executeUpdate();
-
-            if (results == 0)
-            {
-                throw new AMQStoreException("Bridge " + bridge + " not found");
-            }
-        }
-        catch (SQLException e)
-        {
-            throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
-        }
-        finally
-        {
-            closePreparedStatement(stmt);
-            closeConnection(conn);
-        }
-
-    }
 
     @Override
     public Transaction newTransaction()
@@ -2134,8 +1793,9 @@ public class DerbyMessageStore implement
         public ByteBuffer getContent(int offsetInMessage, int size)
         {
             ByteBuffer buf = ByteBuffer.allocate(size);
-            getContent(offsetInMessage, buf);
+            int length = getContent(offsetInMessage, buf);
             buf.position(0);
+            buf.limit(length);
             return  buf;
         }
 
@@ -2673,7 +2333,7 @@ public class DerbyMessageStore implement
     @Override
     public String getStoreType()
     {
-        return DERBY_STORE_TYPE;
+        return TYPE;
     }
 
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Thu Feb 28 16:14:30 2013
@@ -27,11 +27,6 @@ import org.apache.qpid.common.AMQPFilter
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfigType;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.flow.FlowCreditManager;
@@ -61,8 +56,7 @@ import java.util.concurrent.locks.Reentr
  * Encapsulation of a supscription to a queue. <p/> Ties together the protocol session of a subscriber, the consumer tag
  * that was given out by the broker and the channel id. <p/>
  */
-public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener,
-                                                  SubscriptionConfig
+public abstract class SubscriptionImpl implements Subscription, FlowCreditManager.FlowCreditManagerListener
 {
 
     private StateListener _stateListener = new StateListener()
@@ -91,7 +85,6 @@ public abstract class SubscriptionImpl i
     private final long _subscriptionID;
     private LogSubject _logSubject;
     private LogActor _logActor;
-    private UUID _qmfId;
     private final AtomicLong _deliveredCount = new AtomicLong(0);
     private final AtomicLong _deliveredBytes = new AtomicLong(0);
 
@@ -373,11 +366,6 @@ public abstract class SubscriptionImpl i
         return _channel;
     }
 
-    public ConfigStore getConfigStore()
-    {
-        return getQueue().getConfigStore();
-    }
-
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -391,9 +379,6 @@ public abstract class SubscriptionImpl i
         }
         _queue = queue;
 
-        _qmfId = getConfigStore().createId();
-        getConfigStore().addConfiguredObject(this);
-
         _logSubject = new SubscriptionLogSubject(this);
         _logActor = new SubscriptionActor(CurrentActor.get().getRootMessageLogger(), this);
 
@@ -547,8 +532,6 @@ public abstract class SubscriptionImpl i
         {
             _stateChangeLock.unlock();
         }
-        getConfigStore().removeConfiguredObject(this);
-
         //Log Subscription closed
         CurrentActor.get().message(_logSubject, SubscriptionMessages.CLOSE());
     }
@@ -752,11 +735,6 @@ public abstract class SubscriptionImpl i
         return "WINDOW";
     }
 
-    public SessionConfig getSessionConfig()
-    {
-        return getChannel();
-    }
-
     public boolean isBrowsing()
     {
         return isBrowser();
@@ -767,32 +745,16 @@ public abstract class SubscriptionImpl i
         return true;
     }
 
-    @Override
-    public UUID getQMFId()
-    {
-        return _qmfId;
-    }
-
     public boolean isDurable()
     {
         return false;
     }
 
-    public SubscriptionConfigType getConfigType()
-    {
-        return SubscriptionConfigType.getInstance();
-    }
-
     public boolean isExclusive()
     {
         return getQueue().hasExclusiveSubscriber();
     }
 
-    public ConfiguredObject getParent()
-    {
-        return getSessionConfig();
-    }
-
     public String getName()
     {
         return String.valueOf(_consumerTag);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Thu Feb 28 16:14:30 2013
@@ -24,11 +24,6 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfig;
-import org.apache.qpid.server.configuration.SubscriptionConfigType;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.flow.CreditCreditManager;
@@ -86,7 +81,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
+public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, LogSubject
 {
     private final long _subscriptionID;
 
@@ -125,7 +120,6 @@ public class Subscription_0_10 implement
 
     private LogActor _logActor;
     private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
-    private UUID _qmfId;
     private String _traceExclude;
     private String _trace;
     private final long _createTime = System.currentTimeMillis();
@@ -192,8 +186,6 @@ public class Subscription_0_10 implement
         Map<String, Object> arguments = queue.getArguments();
         _traceExclude = (String) arguments.get("qpid.trace.exclude");
         _trace = (String) arguments.get("qpid.trace.id");
-        _qmfId = getConfigStore().createId();
-        getConfigStore().addConfiguredObject(this);
         String filterLogString = null;
 
         _logActor = GenericActor.getInstance(this);
@@ -283,7 +275,6 @@ public class Subscription_0_10 implement
                 }
             }
             _creditManager.removeListener(this);
-            getConfigStore().removeConfiguredObject(this);
             CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE());
         }
         finally
@@ -295,11 +286,6 @@ public class Subscription_0_10 implement
 
     }
 
-    public ConfigStore getConfigStore()
-    {
-        return getQueue().getConfigStore();
-    }
-
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -970,12 +956,6 @@ public class Subscription_0_10 implement
         return _session;
     }
 
-
-    public SessionConfig getSessionConfig()
-    {
-        return getSessionModel();
-    }
-
     public boolean isBrowsing()
     {
         return _acquireMode == MessageAcquireMode.NOT_ACQUIRED;
@@ -986,20 +966,11 @@ public class Subscription_0_10 implement
         return getQueue().hasExclusiveSubscriber();
     }
 
-    public ConfiguredObject getParent()
-    {
-        return getSessionConfig();
-    }
-
     public boolean isDurable()
     {
         return false;
     }
 
-    public SubscriptionConfigType getConfigType()
-    {
-        return SubscriptionConfigType.getInstance();
-    }
 
     public boolean isExplicitAcknowledge()
     {
@@ -1011,12 +982,6 @@ public class Subscription_0_10 implement
         return _flowMode.toString();
     }
 
-    @Override
-    public UUID getQMFId()
-    {
-        return _qmfId;
-    }
-
     public String getName()
     {
         return _destination;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Thu Feb 28 16:14:30 2013
@@ -20,17 +20,16 @@
  */
 package org.apache.qpid.server.transport;
 
+import java.net.SocketAddress;
 import java.security.Principal;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.security.auth.Subject;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.ConnectionConfig;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -39,6 +38,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.Executi
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -55,7 +56,6 @@ import static org.apache.qpid.server.log
 
 public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
 {
-    private ConnectionConfig _config;
     private Runnable _onOpenTask;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
     private LogActor _actor = GenericActor.getInstance(this);
@@ -69,6 +69,7 @@ public class ServerConnection extends Co
     private AtomicLong _lastIoTime = new AtomicLong();
     private boolean _blocking;
     private Principal _peerPrincipal;
+    private NetworkConnection _networkConnection;
 
     public ServerConnection(final long connectionId)
     {
@@ -147,16 +148,6 @@ public class ServerConnection extends Co
         initialiseStatistics();
     }
 
-    public void setConnectionConfig(final ConnectionConfig config)
-    {
-        _config = config;
-    }
-
-    public ConnectionConfig getConfig()
-    {
-        return _config;
-    }
-
     public void onOpen(final Runnable task)
     {
         _onOpenTask = task;
@@ -228,7 +219,7 @@ public class ServerConnection extends Co
                     MessageFormat.format(CONNECTION_FORMAT,
                                          getConnectionId(),
                                          getClientId(),
-                                         getConfig().getAddress(),
+                                         getRemoteAddressString(),
                                          getVirtualHost().getName())
                  + "] ";
         }
@@ -238,7 +229,7 @@ public class ServerConnection extends Co
                     MessageFormat.format(USER_FORMAT,
                                          getConnectionId(),
                                          getClientId(),
-                                         getConfig().getAddress())
+                                         getRemoteAddressString())
                  + "] ";
 
         }
@@ -247,7 +238,7 @@ public class ServerConnection extends Co
             return "[" +
                     MessageFormat.format(SOCKET_FORMAT,
                                          getConnectionId(),
-                                         getConfig().getAddress())
+                                         getRemoteAddressString())
                  + "] ";
         }
     }
@@ -396,7 +387,7 @@ public class ServerConnection extends Co
         else
         {
             _authorizedSubject = authorizedSubject;
-            _authorizedPrincipal = authorizedSubject.getPrincipals().iterator().next();
+            _authorizedPrincipal = AuthenticatedPrincipal.getAuthenticatedPrincipalFromSubject(authorizedSubject);
         }
     }
 
@@ -417,7 +408,7 @@ public class ServerConnection extends Co
 
     public String getRemoteAddressString()
     {
-        return getConfig().getAddress();
+        return String.valueOf(getRemoteAddress());
     }
 
     public String getUserName()
@@ -489,4 +480,32 @@ public class ServerConnection extends Co
     {
         _peerPrincipal = peerPrincipal;
     }
+
+    @Override
+    public void setRemoteAddress(SocketAddress remoteAddress)
+    {
+        super.setRemoteAddress(remoteAddress);
+    }
+
+    @Override
+    public void setLocalAddress(SocketAddress localAddress)
+    {
+        super.setLocalAddress(localAddress);
+    }
+
+    public void setNetworkConnection(NetworkConnection network)
+    {
+        _networkConnection = network;
+    }
+
+    public NetworkConnection getNetworkConnection()
+    {
+        return _networkConnection;
+    }
+
+    public void doHeartbeat()
+    {
+        super.doHeartBeat();
+
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Thu Feb 28 16:14:30 2013
@@ -32,18 +32,19 @@ import javax.security.sasl.SaslException
 import javax.security.sasl.SaslServer;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.server.configuration.BrokerConfig;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.AuthenticationResult;
+import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,36 +54,49 @@ public class ServerConnectionDelegate ex
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ServerConnectionDelegate.class);
 
+    private final Broker _broker;
     private final String _localFQDN;
-    private final IApplicationRegistry _appRegistry;
     private int _maxNoOfChannels;
     private Map<String,Object> _clientProperties;
-    private final AuthenticationManager _authManager;
+    private final SubjectCreator _subjectCreator;
 
-    public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager)
+    public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
     {
-        this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager);
+        this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
     }
 
     private ServerConnectionDelegate(Map<String, Object> properties,
                                     List<Object> locales,
-                                    IApplicationRegistry appRegistry,
+                                    Broker broker,
                                     String localFQDN,
-                                    AuthenticationManager authManager)
+                                    SubjectCreator subjectCreator)
     {
-        super(properties, parseToList(authManager.getMechanisms()), locales);
+        super(properties, parseToList(subjectCreator.getMechanisms()), locales);
 
-        _appRegistry = appRegistry;
+        _broker = broker;
         _localFQDN = localFQDN;
-        _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount();
-        _authManager = authManager;
+        _maxNoOfChannels = (Integer)broker.getAttribute(Broker.SESSION_COUNT_LIMIT);
+        _subjectCreator = subjectCreator;
+    }
+
+    private static List<String> getFeatures(Broker broker)
+    {
+        String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
+        final List<String> features = new ArrayList<String>();
+        if (brokerDisabledFeatures == null || !brokerDisabledFeatures.contains(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR))
+        {
+            features.add(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+        }
+
+        return Collections.unmodifiableList(features);
     }
 
-    private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
+    private static Map<String, Object> createConnectionProperties(final Broker broker)
     {
         final Map<String,Object> map = new HashMap<String,Object>(2);
-        map.put(ServerPropertyNames.FEDERATION_TAG, brokerConfig.getFederationTag());
-        final List<String> features = brokerConfig.getFeatures();
+        // Federation tag is used by the client to identify the broker instance
+        map.put(ServerPropertyNames.FEDERATION_TAG, broker.getId().toString());
+        final List<String> features = getFeatures(broker);
         if (features != null && features.size() > 0)
         {
             map.put(ServerPropertyNames.QPID_FEATURES, features);
@@ -112,14 +126,14 @@ public class ServerConnectionDelegate ex
 
     protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
     {
-        return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
+        return _subjectCreator.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
 
     }
 
     protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
     {
         final ServerConnection sconn = (ServerConnection) conn;
-        final AuthenticationResult authResult = _authManager.authenticate(ss, response);
+        final SubjectAuthenticationResult authResult = _subjectCreator.authenticate(ss, response);
 
         if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
         {
@@ -166,7 +180,7 @@ public class ServerConnectionDelegate ex
         {
             vhostName = "";
         }
-        vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
+        vhost = _broker.getVirtualHostRegistry().getVirtualHost(vhostName);
 
         SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
 
@@ -174,7 +188,7 @@ public class ServerConnectionDelegate ex
         {
             sconn.setVirtualHost(vhost);
 
-            if (!vhost.getSecurityManager().accessVirtualhost(vhostName, ((ProtocolEngine) sconn.getConfig()).getRemoteAddress()))
+            if (!vhost.getSecurityManager().accessVirtualhost(vhostName, sconn.getRemoteAddress()))
             {
                 sconn.setState(Connection.State.CLOSING);
                 sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
@@ -215,14 +229,18 @@ public class ServerConnectionDelegate ex
             return;
         }
 
-        setConnectionTuneOkChannelMax(sconn, okChannelMax);
-    }
+        if(ok.hasHeartbeat())
+        {
+            final int heartbeat = ok.getHeartbeat();
+            if(heartbeat > 0)
+            {
+                final NetworkConnection networkConnection = sconn.getNetworkConnection();
+                networkConnection.setMaxReadIdle(2 * heartbeat);
+                networkConnection.setMaxWriteIdle(heartbeat);
+            }
+        }
 
-    @Override
-    protected int getHeartbeatMax()
-    {
-        //TODO: implement broker support for actually sending heartbeats
-        return 0;
+        setConnectionTuneOkChannelMax(sconn, okChannelMax);
     }
 
     @Override

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Thu Feb 28 16:14:30 2013
@@ -42,13 +42,8 @@ import javax.security.auth.Subject;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.TransactionTimeoutHelper;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -91,7 +86,7 @@ import static org.apache.qpid.server.log
 import static org.apache.qpid.util.Serial.gt;
 
 public class ServerSession extends Session
-        implements AuthorizationHolder, SessionConfig,
+        implements AuthorizationHolder,
                    AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
 {
     private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
@@ -100,8 +95,7 @@ public class ServerSession extends Sessi
     private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
     private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
 
-    private final UUID _id;
-    private ConnectionConfig _connectionConfig;
+    private final UUID _id = UUID.randomUUID();
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
 
@@ -139,7 +133,6 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
 
@@ -147,21 +140,21 @@ public class ServerSession extends Sessi
 
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
 
-    ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
-    {
-        this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
-    }
 
-    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
+    public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
     {
         super(connection, delegate, name, expiry);
-        _connectionConfig = connConfig;
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
         _logSubject = new ChannelLogSubject(this);
-        _id = getConfigStore().createId();
-        getConfigStore().addConfiguredObject(this);
 
-        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+        {
+            @Override
+            public void doTimeoutAction(String reason) throws AMQException
+            {
+                getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+            }
+        });
     }
 
     protected void setState(State state)
@@ -184,12 +177,6 @@ public class ServerSession extends Sessi
         invoke(new MessageStop(""));
     }
 
-    private ConfigStore getConfigStore()
-    {
-        return getConnectionConfig().getConfigStore();
-    }
-
-
     @Override
     protected boolean isFull(int id)
     {
@@ -206,9 +193,8 @@ public class ServerSession extends Sessi
         }
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
         PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
-        _transaction.enqueue(queues,message, postTransactionAction, 0L);
+        _transaction.enqueue(queues,message, postTransactionAction);
         incrementOutstandingTxnsIfNecessary();
-        updateTransactionalActivity();
     }
 
 
@@ -389,8 +375,6 @@ public class ServerSession extends Sessi
         }
         _messageDispositionListenerMap.clear();
 
-        getConfigStore().removeConfiguredObject(this);
-
         for (Task task : _taskList)
         {
             task.doTask(this);
@@ -424,7 +408,6 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
-	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -470,11 +453,6 @@ public class ServerSession extends Sessi
         return _transaction.isTransactional();
     }
 
-    public boolean inTransaction()
-    {
-        return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
-    }
-
     public void selectTx()
     {
         _transaction = new LocalTransaction(this.getMessageStore());
@@ -609,22 +587,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    /**
-     * Update last transaction activity timestamp
-     */
-    public void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(System.currentTimeMillis());
-        }
-    }
-
-    public Long getTxnStarts()
-    {
-        return _txnStarts.get();
-    }
-
     public Long getTxnCommits()
     {
         return _txnCommits.get();
@@ -682,23 +644,7 @@ public class ServerSession extends Sessi
 
     public VirtualHost getVirtualHost()
     {
-        return (VirtualHost) _connectionConfig.getVirtualHost();
-    }
-
-    @Override
-    public UUID getQMFId()
-    {
-        return _id;
-    }
-
-    public SessionConfigType getConfigType()
-    {
-        return SessionConfigType.getInstance();
-    }
-
-    public ConfiguredObject getParent()
-    {
-        return getVirtualHost();
+        return getConnection().getVirtualHost();
     }
 
     public boolean isDurable()
@@ -706,44 +652,16 @@ public class ServerSession extends Sessi
         return false;
     }
 
-    public boolean isAttached()
-    {
-        return true;
-    }
-
-    public long getDetachedLifespan()
-    {
-        return 0;
-    }
-
-    public Long getExpiryTime()
-    {
-        return null;
-    }
-
-    public Long getMaxClientRate()
-    {
-        return null;
-    }
-
-    public ConnectionConfig getConnectionConfig()
-    {
-        return _connectionConfig;
-    }
-
-    public String getSessionName()
-    {
-        return getName().toString();
-    }
 
     public long getCreateTime()
     {
         return _createTime;
     }
 
-    public void mgmtClose()
+    @Override
+    public UUID getId()
     {
-        close();
+        return _id;
     }
 
     public AMQConnectionModel getConnectionModel()
@@ -774,28 +692,7 @@ public class ServerSession extends Sessi
 
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        if (inTransaction())
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - _transaction.getTransactionStartTime();
-            long idleTime = currentTime - _txnUpdateTime.get();
-
-            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
-                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
-                return;
-            }
-
-            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
-                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
-                return;
-            }
-        }
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
     public void block(AMQQueue queue)
@@ -878,9 +775,7 @@ public class ServerSession extends Sessi
                             ? getConnection().getConnectionId()
                             : -1;
 
-        String remoteAddress = _connectionConfig instanceof ProtocolEngine
-                                ? ((ProtocolEngine) _connectionConfig).getRemoteAddress().toString()
-                                : "";
+        String remoteAddress = String.valueOf(getConnection().getRemoteAddress());
         return "[" +
                MessageFormat.format(CHANNEL_FORMAT,
                                     connectionId,
@@ -1065,14 +960,16 @@ public class ServerSession extends Sessi
         super.setClose(close);
     }
 
-    public int compareTo(AMQSessionModel session)
+    @Override
+    public int getConsumerCount()
     {
-        return getQMFId().compareTo(session.getQMFId());
+        return _subscriptions.values().size();
     }
 
     @Override
-    public int getConsumerCount()
+    public int compareTo(AMQSessionModel o)
     {
-        return _subscriptions.values().size();
+        return getId().compareTo(o.getId());
     }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Thu Feb 28 16:14:30 2013
@@ -31,7 +31,6 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeInUseException;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
 import org.apache.qpid.server.exchange.HeadersExchange;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -41,11 +40,11 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
@@ -1266,18 +1265,12 @@ public class ServerSessionDelegate exten
                             }
                         }
                         queueRegistry.registerQueue(queue);
-                        boolean autoRegister = ApplicationRegistry.getInstance().getConfiguration().getQueueAutoRegister();
 
-                        if (autoRegister)
-                        {
-
-                            ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+                        ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
 
-                            Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
+                        Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
 
-                            virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
-
-                        }
+                        virtualHost.getBindingFactory().addBinding(queueName, queue, defaultExchange, null);
 
                         if (method.hasAutoDelete()
                             && method.getAutoDelete()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Thu Feb 28 16:14:30 2013
@@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction 
         _futureRecorder = recorder;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return 0L;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.
@@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction 
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Thu Feb 28 16:14:30 2013
@@ -52,11 +52,18 @@ public class AutoCommitTransaction imple
         _messageStore = transactionLog;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return 0L;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.
@@ -178,7 +185,7 @@ public class AutoCommitTransaction imple
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try
@@ -270,4 +277,6 @@ public class AutoCommitTransaction imple
         }
     }
 
+
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Thu Feb 28 16:14:30 2013
@@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -39,10 +38,6 @@ public class DistributedTransaction impl
 
     private final AutoCommitTransaction _autoCommitTransaction;
 
-    private volatile Transaction _transaction;
-
-    private long _txnStartTime = 0L;
-
     private DtxBranch _branch;
     private AMQSessionModel _session;
     private VirtualHost _vhost;
@@ -55,9 +50,16 @@ public class DistributedTransaction impl
         _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
     }
 
+    @Override
     public long getTransactionStartTime()
     {
-        return _txnStartTime;
+        return 0;
+    }
+
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0;
     }
 
     public void addPostTransactionAction(Action postTransactionAction)
@@ -107,7 +109,7 @@ public class DistributedTransaction impl
         {
             _branch.enqueue(queue, message);
             _branch.addPostTransactionAcion(postTransactionAction);
-            enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+            enqueue(Collections.singletonList(queue), message, postTransactionAction);
         }
         else
         {
@@ -116,7 +118,7 @@ public class DistributedTransaction impl
     }
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
-                        Action postTransactionAction, long currentTime)
+                        Action postTransactionAction)
     {
         if(_branch != null)
         {
@@ -128,7 +130,7 @@ public class DistributedTransaction impl
         }
         else
         {
-            _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+            _autoCommitTransaction.enqueue(queues, message, postTransactionAction);
         }
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Thu Feb 28 16:14:30 2013
@@ -49,25 +49,42 @@ public class LocalTransaction implements
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile Transaction _transaction;
-    private MessageStore _transactionLog;
-    private long _txnStartTime = 0L;
+    private final ActivityTimeAccessor _activityTime;
+    private final MessageStore _transactionLog;
+    private volatile long _txnStartTime = 0L;
+    private volatile long _txnUpdateTime = 0l;
     private StoreFuture _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
-        _transactionLog = transactionLog;
+        this(transactionLog, new ActivityTimeAccessor()
+        {
+            @Override
+            public long getActivityTime()
+            {
+                return System.currentTimeMillis();
+            }
+        });
     }
-    
-    public boolean inTransaction()
+
+    public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
     {
-        return _transaction != null;
+        _transactionLog = transactionLog;
+        _activityTime = activityTime;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return _txnStartTime;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return _txnUpdateTime;
+    }
+
     public void addPostTransactionAction(Action postTransactionAction)
     {
         sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent() && queue.isDurable())
         {
@@ -104,6 +122,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         try
         {
@@ -180,6 +199,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent() && queue.isDurable())
         {
@@ -189,7 +209,7 @@ public class LocalTransaction implements
                 {
                     _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
                 }
-                
+
                 beginTranIfNecessary();
                 _transaction.enqueueMessage(queue, message);
             }
@@ -202,15 +222,11 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
-
-        if (_txnStartTime == 0L)
-        {
-            _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
-        }
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent())
         {
@@ -224,8 +240,7 @@ public class LocalTransaction implements
                         {
                             _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
                         }
-                        
-                        
+
                         beginTranIfNecessary();
                         _transaction.enqueueMessage(queue, message);
                     }
@@ -378,16 +393,24 @@ public class LocalTransaction implements
             }
             throw new RuntimeException("Failed to commit transaction", e);
         }
-
-
     }
 
     private void doPostTransactionActions()
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Beginning " + _postTransactionActions.size() + " post transaction actions");
+        }
+
         for(int i = 0; i < _postTransactionActions.size(); i++)
         {
             _postTransactionActions.get(i).postCommit();
         }
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Completed post transaction actions");
+        }
     }
 
     public void rollback()
@@ -427,16 +450,34 @@ public class LocalTransaction implements
         }
     }
 
+    private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+    {
+        long currentTime = _activityTime.getActivityTime();
+
+        if (_txnStartTime == 0)
+        {
+            _txnStartTime = currentTime;
+        }
+        _txnUpdateTime = currentTime;
+    }
+
     private void resetDetails()
     {
         _asyncTran = null;
         _transaction = null;
-	    _postTransactionActions.clear();
+        _postTransactionActions.clear();
         _txnStartTime = 0L;
+        _txnUpdateTime = 0;
     }
 
     public boolean isTransactional()
     {
         return true;
     }
+
+    public interface ActivityTimeAccessor
+    {
+        long getActivityTime();
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Thu Feb 28 16:14:30 2013
@@ -55,11 +55,18 @@ public interface ServerTransaction
 
     /**
      * Return the time the current transaction started.
-     * 
+     *
      * @return the time this transaction started or 0 if not in a transaction
      */
     long getTransactionStartTime();
 
+    /**
+     * Return the time of the last activity on the current transaction.
+     *
+     * @return the time of the last activity or 0 if not in a transaction
+     */
+    long getTransactionUpdateTime();
+
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.
@@ -92,7 +99,7 @@ public interface ServerTransaction
      * 
      * Store operations will result only for a persistent messages on durable queues.
      */
-    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
+    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
 
     /** 
      * Commit the transaction represented by this object.

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1375509-1450773

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Thu Feb 28 16:14:30 2013
@@ -25,13 +25,10 @@ import java.util.UUID;
 import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.binding.BindingFactory;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.connection.IConnectionRegistry;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -41,7 +38,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.txn.DtxRegistry;
 
-public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
+public interface VirtualHost extends DurableConfigurationStore.Source, Closeable, StatisticsGatherer
 {
     IConnectionRegistry getConnectionRegistry();
 
@@ -61,8 +58,6 @@ public interface VirtualHost extends Dur
 
     void close();
 
-    UUID getBrokerId();
-
     UUID getId();
 
     void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
@@ -77,25 +72,12 @@ public interface VirtualHost extends Dur
 
     int getHouseKeepingActiveCount();
 
-    IApplicationRegistry getApplicationRegistry();
+    VirtualHostRegistry getVirtualHostRegistry();
 
     BindingFactory getBindingFactory();
 
-    void createBrokerConnection(String transport,
-                                String host,
-                                int port,
-                                String vhost,
-                                boolean durable,
-                                String authMechanism, String username, String password);
-
-    public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments);
-
-    ConfigStore getConfigStore();
-
     DtxRegistry getDtxRegistry();
 
-    void removeBrokerConnection(BrokerLink brokerLink);
-
     LinkRegistry getLinkRegistry(String remoteContainerId);
 
     ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org