You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [17/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Aug  3 12:13:32 2012
@@ -32,7 +32,9 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -61,6 +63,18 @@ public class AMQStateManager implements 
 
     }
 
+    /**
+     * Get the ApplicationRegistry associated with this AMQStateManager
+     *
+     * returns the application registry associated with the VirtualHostRegistry of the AMQStateManager
+     *
+     * @return the ApplicationRegistry
+     */
+    public IApplicationRegistry getApplicationRegistry()
+    {
+        return _virtualHostRegistry.getApplicationRegistry();
+    }
+
     public AMQState getCurrentState()
     {
         return _currentState;
@@ -142,4 +156,14 @@ public class AMQStateManager implements 
         SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
         return _protocolSession;
     }
+
+    /**
+     * Get the AuthenticationManager associated with the ProtocolSession of the AMQStateManager
+     *
+     * @return the AuthenticationManager
+     */
+    public AuthenticationManager getAuthenticationManager()
+    {
+        return getApplicationRegistry().getAuthenticationManager(getProtocolSession().getLocalAddress());
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Fri Aug  3 12:13:32 2012
@@ -33,8 +33,7 @@ public class StatisticsCounter
     private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
     
     public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
-    public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
-    
+
     private static final String COUNTER = "counter";
     private static final AtomicLong _counterIds = new AtomicLong(0L);
     
@@ -78,11 +77,6 @@ public class StatisticsCounter
 
     public void registerEvent(long value, long timestamp)
     {
-        if (DISABLE_STATISTICS)
-        {
-            return;
-        }
-        
         long thisSample = (timestamp / _period);
         synchronized (this)
         {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Fri Aug  3 12:13:32 2012
@@ -103,16 +103,4 @@ public interface StatisticsGatherer
      * Reset the counters for this, and any child {@link StatisticsGatherer}s.
      */
     void resetStatistics();
-    
-    /**
-     * Check if this object has statistics generation enabled.
-     * 
-     * @return true if statistics generation is enabled
-     */
-    boolean isStatisticsEnabled();
-    
-    /**
-     * Enable or disable statistics generation for this object.
-     */
-    void setStatisticsEnabled(boolean enabled);
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Aug  3 12:13:32 2012
@@ -28,20 +28,21 @@ import java.util.UUID;
 
 public interface ConfigurationRecoveryHandler
 {
-    QueueRecoveryHandler begin(MessageStore store);
+    ExchangeRecoveryHandler begin(MessageStore store);
 
-    public static interface QueueRecoveryHandler
+    public static interface ExchangeRecoveryHandler
     {
-        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments);
-        ExchangeRecoveryHandler completeQueueRecovery();
+        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
+        QueueRecoveryHandler completeExchangeRecovery();
     }
 
-    public static interface ExchangeRecoveryHandler
+    public static interface QueueRecoveryHandler
     {
-        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
-        BindingRecoveryHandler completeExchangeRecovery();
+        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
+        BindingRecoveryHandler completeQueueRecovery();
     }
 
+
     public static interface BindingRecoveryHandler
     {
         void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java Fri Aug  3 12:13:32 2012
@@ -40,12 +40,7 @@ import org.apache.qpid.server.util.MapJs
 
 public class ConfiguredObjectHelper
 {
-    /**
-     * Name of queue attribute to store queue creation arguments.
-     * <p>
-     * This attribute is not defined yet on Queue configured object interface.
-     */
-    private static final String QUEUE_ARGUMENTS = "ARGUMENTS";
+
 
     private MapJsonSerializer _serializer = new MapJsonSerializer();
 
@@ -57,14 +52,15 @@ public class ConfiguredObjectHelper
             String queueName = (String) attributeMap.get(Queue.NAME);
             String owner = (String) attributeMap.get(Queue.OWNER);
             boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+            UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
             @SuppressWarnings("unchecked")
-            Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(QUEUE_ARGUMENTS);
+            Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
             FieldTable arguments = null;
             if (queueArgumentsMap != null)
             {
                 arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
             }
-            qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments);
+            qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments, alternateExchangeId);
         }
     }
 
@@ -73,6 +69,24 @@ public class ConfiguredObjectHelper
         Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes());
         attributesMap.put(Queue.NAME, queue.getName());
         attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+        if (queue.getAlternateExchange() != null)
+        {
+            attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+        }
+        else
+        {
+            attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
+        }
+        if (attributesMap.containsKey(Queue.ARGUMENTS))
+        {
+            // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
+            Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
+            currentArgs.putAll(queue.getArguments());
+        }
+        else
+        {
+            attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+        }
         String newJson = _serializer.serialize(attributesMap);
         ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson);
         return newQueueRecord;
@@ -84,9 +98,15 @@ public class ConfiguredObjectHelper
         attributesMap.put(Queue.NAME, queue.getName());
         attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
         attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+        if (queue.getAlternateExchange() != null)
+        {
+            attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+        }
+        // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
+        // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
         if (arguments != null)
         {
-            attributesMap.put(QUEUE_ARGUMENTS, FieldTable.convertToMap(arguments));
+            attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
         }
         String json = _serializer.serialize(attributesMap);
         ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java Fri Aug  3 12:13:32 2012
@@ -23,10 +23,20 @@ public enum Event
 {
     BEFORE_INIT,
     AFTER_INIT,
+
     BEFORE_ACTIVATE,
     AFTER_ACTIVATE,
+
     BEFORE_PASSIVATE,
     AFTER_PASSIVATE,
+
     BEFORE_CLOSE,
-    AFTER_CLOSE
+    AFTER_CLOSE,
+
+    BEFORE_QUIESCE,
+    AFTER_QUIESCE,
+    BEFORE_RESTART,
+
+    PERSISTENT_MESSAGE_SIZE_OVERFULL,
+    PERSISTENT_MESSAGE_SIZE_UNDERFULL
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java Fri Aug  3 12:13:32 2012
@@ -24,9 +24,12 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 public class EventManager
 {
     private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class);
+    private static final Logger _LOGGER = Logger.getLogger(EventManager.class);
 
     public synchronized void addEventListener(EventListener listener, Event... events)
     {
@@ -46,6 +49,11 @@ public class EventManager
     {
         if (_listeners.containsKey(event))
         {
+            if(_LOGGER.isDebugEnabled())
+            {
+                _LOGGER.debug("Received event " + event);
+            }
+
             for (EventListener listener : _listeners.get(event))
             {
                 listener.event(event);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Aug  3 12:13:32 2012
@@ -83,19 +83,19 @@ public class MemoryMessageStore extends 
     @Override
     public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
     {
-        _stateManager.attainState(State.CONFIGURING);
+        _stateManager.attainState(State.INITIALISING);
     }
 
     @Override
     public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
     {
-        _stateManager.attainState(State.CONFIGURED);
+        _stateManager.attainState(State.INITIALISED);
     }
 
     @Override
     public void activate() throws Exception
     {
-        _stateManager.attainState(State.RECOVERING);
+        _stateManager.attainState(State.ACTIVATING);
         
         _stateManager.attainState(State.ACTIVE);
     }
@@ -134,4 +134,10 @@ public class MemoryMessageStore extends 
     {
         _eventManager.addEventListener(eventListener, events);
     }
+
+    @Override
+    public String getStoreType()
+    {
+        return "Memory";
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Aug  3 12:13:32 2012
@@ -67,4 +67,6 @@ public interface MessageStore extends Du
     void addEventListener(EventListener eventListener, Event... events);
 
     String getStoreLocation();
+
+    String getStoreType();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java Fri Aug  3 12:13:32 2012
@@ -23,5 +23,7 @@ public class MessageStoreConstants
 {
 
     public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+    public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+    public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
 
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Aug  3 12:13:32 2012
@@ -28,7 +28,7 @@ import org.apache.qpid.server.federation
 import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.queue.AMQQueue;
 
-public class NullMessageStore implements MessageStore
+public abstract class NullMessageStore implements MessageStore
 {
     @Override
     public void configureConfigStore(String name,

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java Fri Aug  3 12:13:32 2012
@@ -33,7 +33,14 @@ public class OperationalLoggingListener 
     private OperationalLoggingListener(final MessageStore store, LogSubject logSubject)
     {
         _logSubject = logSubject;
-        store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE);
+        store.addEventListener(this,
+                               Event.BEFORE_INIT,
+                               Event.AFTER_INIT,
+                               Event.BEFORE_ACTIVATE,
+                               Event.AFTER_ACTIVATE,
+                               Event.AFTER_CLOSE,
+                               Event.PERSISTENT_MESSAGE_SIZE_OVERFULL,
+                               Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
         _store = store;
     }
 
@@ -62,7 +69,13 @@ public class OperationalLoggingListener 
             case AFTER_CLOSE:
                 CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
                 break;
-            
+            case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+                CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL());
+                break;
+            case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+                CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL());
+                break;
+
         }
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java Fri Aug  3 12:13:32 2012
@@ -20,19 +20,30 @@
  */
 package org.apache.qpid.server.store;
 
+import org.apache.qpid.server.configuration.ConfiguredObject;
+
 public enum State
 {
-    
+    /** The initial state of the store.  In practice, the store immediately transitions to the subsequent states. */
     INITIAL,
-    CONFIGURING,
-    CONFIGURED,
-    RECOVERING,
+
+    INITIALISING,
+    /**
+     * The initial set-up of the store has completed.
+     * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+     *
+     * From the point of view of the user, the store is essentially stopped.
+     */
+    INITIALISED,
+
+    ACTIVATING,
     ACTIVE,
-    QUIESCING,
-    QUIESCED,
+
     CLOSING,
-    CLOSED;
-        
+    CLOSED,
 
+    QUIESCING,
+    /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */
+    QUIESCED;
 
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java Fri Aug  3 12:13:32 2012
@@ -24,6 +24,8 @@ package org.apache.qpid.server.store;
 import java.util.EnumMap;
 import java.util.Map;
 
+import org.apache.qpid.server.store.StateManager.Transition;
+
 public class StateManager
 {
     private State _state = State.INITIAL;
@@ -70,16 +72,23 @@ public class StateManager
 
     }
 
-    public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT);
-    public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT);
-    public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE);
-    public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE);
+    public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT);
+    public static final Transition INITALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT);
+
+    public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
+    public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
+
+    public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
     public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
     public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
     public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
-    public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE);
-    public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE);
-    public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+
+    public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE);
+
+    public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE);
+    public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE);
+
+    public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART);
 
 
     public StateManager(final EventManager eventManager)
@@ -105,16 +114,6 @@ public class StateManager
         return _state;
     }
 
-    public synchronized void stateTransition(final State current, final State desired)
-    {
-        if (_state != current)
-        {
-            throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
-                                   + "; currently in state: " + _state);
-        }
-        attainState(desired);
-    }
-
     public synchronized void attainState(State desired)
     {
         Transition transition = null;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Fri Aug  3 12:13:32 2012
@@ -31,6 +31,7 @@ import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.sql.Blob;
+import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverManager;
@@ -59,6 +60,7 @@ import org.apache.qpid.server.federation
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
 import org.apache.qpid.server.store.ConfiguredObjectHelper;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
 import org.apache.qpid.server.store.Event;
@@ -77,6 +79,9 @@ import org.apache.qpid.server.store.Stor
 import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
 
 /**
  * An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
@@ -111,6 +116,7 @@ public class DerbyMessageStore implement
 
 
     private static Class<Driver> DRIVER_CLASS;
+    public static final String MEMORY_STORE_LOCATION = ":memory:";
 
     private final AtomicLong _messageId = new AtomicLong(0);
     private AtomicBoolean _closed = new AtomicBoolean(false);
@@ -230,10 +236,17 @@ public class DerbyMessageStore implement
 
     private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
 
+    private static final String DERBY_STORE_TYPE = "DERBY";
+
     private final StateManager _stateManager;
-    
+
     private final EventManager _eventManager = new EventManager();
 
+    private long _totalStoreSize;
+    private boolean _limitBusted;
+    private long _persistentSizeLowThreshold;
+    private long _persistentSizeHighThreshold;
+
     private MessageStoreRecoveryHandler _messageRecoveryHandler;
 
     private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -253,7 +266,7 @@ public class DerbyMessageStore implement
                           ConfigurationRecoveryHandler configRecoveryHandler,
                           Configuration storeConfiguration) throws Exception
     {
-        _stateManager.attainState(State.CONFIGURING);
+        _stateManager.attainState(State.INITIALISING);
         _configRecoveryHandler = configRecoveryHandler;
 
         commonConfiguration(name, storeConfiguration);
@@ -269,13 +282,13 @@ public class DerbyMessageStore implement
         _tlogRecoveryHandler = tlogRecoveryHandler;
         _messageRecoveryHandler = recoveryHandler;
 
-        _stateManager.attainState(State.CONFIGURED);
+        _stateManager.attainState(State.INITIALISED);
     }
 
     @Override
     public void activate() throws Exception
     {
-        _stateManager.attainState(State.RECOVERING);
+        _stateManager.attainState(State.ACTIVATING);
 
         // this recovers durable exchanges, queues, and bindings
         recoverConfiguration(_configRecoveryHandler);
@@ -296,19 +309,39 @@ public class DerbyMessageStore implement
         final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
                 + File.separator + "derbyDB");
 
-        File environmentPath = new File(databasePath);
-        if (!environmentPath.exists())
+        if(!MEMORY_STORE_LOCATION.equals(databasePath))
         {
-            if (!environmentPath.mkdirs())
+            File environmentPath = new File(databasePath);
+            if (!environmentPath.exists())
             {
-                throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
-                    + "Ensure the path is correct and that the permissions are correct.");
+                if (!environmentPath.mkdirs())
+                {
+                    throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+                        + "Ensure the path is correct and that the permissions are correct.");
+                }
             }
         }
 
         _storeLocation = databasePath;
 
+        _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
+        _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+        if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+        {
+            _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+        }
+
         createOrOpenDatabase(name, databasePath);
+
+        Connection conn = newAutoCommitConnection();;
+        try
+        {
+            _totalStoreSize = getSizeOnDisk(conn);
+        }
+        finally
+        {
+            conn.close();
+        }
     }
 
     private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -322,7 +355,7 @@ public class DerbyMessageStore implement
     private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
     {
         //FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
-        _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
+        _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
 
         Connection conn = newAutoCommitConnection();
 
@@ -529,16 +562,17 @@ public class DerbyMessageStore implement
         try
         {
             List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
-            ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
-            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
 
-            ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+            ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
             _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
 
-            ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+            QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+            _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+            BindingRecoveryHandler brh = qrh.completeQueueRecovery();
             _configuredObjectHelper.recoverBindings(brh, configuredObjects);
 
-            ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+            BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
             recoverBrokerLinks(lrh);
         }
         catch (SQLException e)
@@ -689,7 +723,7 @@ public class DerbyMessageStore implement
     public void close() throws Exception
     {
         _closed.getAndSet(true);
-        _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
+        _stateManager.attainState(State.CLOSING);
 
         try
         {
@@ -710,7 +744,7 @@ public class DerbyMessageStore implement
             }
         }
 
-        _stateManager.stateTransition(State.CLOSING, State.CLOSED);
+        _stateManager.attainState(State.CLOSED);
     }
 
     @Override
@@ -956,8 +990,8 @@ public class DerbyMessageStore implement
                 try
                 {
 
-                    stmt.setLong(1, link.getId().getLeastSignificantBits());
-                    stmt.setLong(2, link.getId().getMostSignificantBits());
+                    stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+                    stmt.setLong(2, link.getQMFId().getMostSignificantBits());
                     ResultSet rs = stmt.executeQuery();
                     try
                     {
@@ -970,8 +1004,8 @@ public class DerbyMessageStore implement
                             try
                             {
 
-                                insertStmt.setLong(1, link.getId().getLeastSignificantBits());
-                                insertStmt.setLong(2, link.getId().getMostSignificantBits());
+                                insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+                                insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
                                 insertStmt.setLong(3, link.getCreateTime());
 
                                 byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
@@ -1048,8 +1082,8 @@ public class DerbyMessageStore implement
         {
             conn = newAutoCommitConnection();
             stmt = conn.prepareStatement(DELETE_FROM_LINKS);
-            stmt.setLong(1, link.getId().getLeastSignificantBits());
-            stmt.setLong(2, link.getId().getMostSignificantBits());
+            stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+            stmt.setLong(2, link.getQMFId().getMostSignificantBits());
             int results = stmt.executeUpdate();
 
             if (results == 0)
@@ -1085,7 +1119,7 @@ public class DerbyMessageStore implement
                 try
                 {
 
-                    UUID id = bridge.getId();
+                    UUID id = bridge.getQMFId();
                     stmt.setLong(1, id.getLeastSignificantBits());
                     stmt.setLong(2, id.getMostSignificantBits());
                     ResultSet rs = stmt.executeQuery();
@@ -1105,7 +1139,7 @@ public class DerbyMessageStore implement
 
                                 insertStmt.setLong(3, bridge.getCreateTime());
 
-                                UUID linkId = bridge.getLink().getId();
+                                UUID linkId = bridge.getLink().getQMFId();
                                 insertStmt.setLong(4, linkId.getLeastSignificantBits());
                                 insertStmt.setLong(5, linkId.getMostSignificantBits());
 
@@ -1151,8 +1185,8 @@ public class DerbyMessageStore implement
         {
             conn = newAutoCommitConnection();
             stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
-            stmt.setLong(1, bridge.getId().getLeastSignificantBits());
-            stmt.setLong(2, bridge.getId().getMostSignificantBits());
+            stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
+            stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
             int results = stmt.executeUpdate();
 
             if (results == 0)
@@ -1541,7 +1575,7 @@ public class DerbyMessageStore implement
                         buf = buf.slice();
                         MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
                         StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
-                        StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+                        StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
                         messageHandler.message(message);
                     }
 
@@ -1921,6 +1955,7 @@ public class DerbyMessageStore implement
     private class DerbyTransaction implements Transaction
     {
         private final ConnectionWrapper _connWrapper;
+        private int _storeSizeIncrease;
 
 
         private DerbyTransaction()
@@ -1938,18 +1973,19 @@ public class DerbyMessageStore implement
         @Override
         public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
-            if(message.getStoredMessage() instanceof StoredDerbyMessage)
+            final StoredMessage storedMessage = message.getStoredMessage();
+            if(storedMessage instanceof StoredDerbyMessage)
             {
                 try
                 {
-                    ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+                    ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
                 }
                 catch (SQLException e)
                 {
                     throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
                 }
             }
-
+            _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
             DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
         }
 
@@ -1964,12 +2000,15 @@ public class DerbyMessageStore implement
         public void commitTran() throws AMQStoreException
         {
             DerbyMessageStore.this.commitTran(_connWrapper);
+            storedSizeChange(_storeSizeIncrease);
         }
 
         @Override
         public StoreFuture commitTranAsync() throws AMQStoreException
         {
-            return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+            final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+            storedSizeChange(_storeSizeIncrease);
+            return storeFuture;
         }
 
         @Override
@@ -1998,6 +2037,8 @@ public class DerbyMessageStore implement
     {
 
         private final long _messageId;
+        private final boolean _isRecovered;
+
         private StorableMessageMetaData _metaData;
         private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
         private byte[] _data;
@@ -2006,21 +2047,21 @@ public class DerbyMessageStore implement
 
         StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
         {
-            this(messageId, metaData, true);
+            this(messageId, metaData, false);
         }
 
 
         StoredDerbyMessage(long messageId,
-                           StorableMessageMetaData metaData, boolean persist)
+                           StorableMessageMetaData metaData, boolean isRecovered)
         {
             _messageId = messageId;
+            _isRecovered = isRecovered;
 
-
-            _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
-            if(persist)
+            if(!_isRecovered)
             {
                 _metaData = metaData;
             }
+            _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
         }
 
         @Override
@@ -2101,16 +2142,17 @@ public class DerbyMessageStore implement
         @Override
         public synchronized StoreFuture flushToStore()
         {
+            Connection conn = null;
             try
             {
-                if(_metaData != null)
+                if(!stored())
                 {
-                    Connection conn = newConnection();
+                    conn = newConnection();
 
                     store(conn);
 
                     conn.commit();
-                    conn.close();
+                    storedSizeChange(getMetaData().getContentSize());
                 }
             }
             catch (SQLException e)
@@ -2121,12 +2163,24 @@ public class DerbyMessageStore implement
                 }
                 throw new RuntimeException(e);
             }
+            finally
+            {
+                closeConnection(conn);
+            }
             return StoreFuture.IMMEDIATE_FUTURE;
         }
 
+        @Override
+        public void remove()
+        {
+            int delta = getMetaData().getContentSize();
+            DerbyMessageStore.this.removeMessage(_messageId);
+            storedSizeChange(-delta);
+        }
+
         private synchronized void store(final Connection conn) throws SQLException
         {
-            if(_metaData != null)
+            if (!stored())
             {
                 try
                 {
@@ -2139,18 +2193,17 @@ public class DerbyMessageStore implement
                     _metaData = null;
                     _data = null;
                 }
-            }
 
-            if(_logger.isDebugEnabled())
-            {
-                _logger.debug("Storing message " + _messageId + " to store");
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug("Storing message " + _messageId + " to store");
+                }
             }
         }
 
-        @Override
-        public void remove()
+        private boolean stored()
         {
-            DerbyMessageStore.this.removeMessage(_messageId);
+            return _metaData == null || _isRecovered;
         }
     }
 
@@ -2446,4 +2499,181 @@ public class DerbyMessageStore implement
         }
         return results;
     }
+
+    private synchronized void storedSizeChange(final int delta)
+    {
+        if(getPersistentSizeHighThreshold() > 0)
+        {
+            synchronized(this)
+            {
+                // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+                // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+                // assuming the total size will change by less than twice the amount of the message data change.
+                long newSize = _totalStoreSize += 3*delta;
+
+                Connection conn = null;
+                try
+                {
+
+                    if(!_limitBusted &&  newSize > getPersistentSizeHighThreshold())
+                    {
+                        conn = newAutoCommitConnection();
+                        _totalStoreSize = getSizeOnDisk(conn);
+                        if(_totalStoreSize > getPersistentSizeHighThreshold())
+                        {
+                            _limitBusted = true;
+                            _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+                        }
+                    }
+                    else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+                    {
+                        long oldSize = _totalStoreSize;
+                        conn = newAutoCommitConnection();
+                        _totalStoreSize = getSizeOnDisk(conn);
+                        if(oldSize <= _totalStoreSize)
+                        {
+
+                            reduceSizeOnDisk(conn);
+
+                            _totalStoreSize = getSizeOnDisk(conn);
+                        }
+
+                        if(_totalStoreSize < getPersistentSizeLowThreshold())
+                        {
+                            _limitBusted = false;
+                            _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+                        }
+
+
+                    }
+                }
+                catch (SQLException e)
+                {
+                    closeConnection(conn);
+                    throw new RuntimeException("Exception will processing store size change", e);
+                }
+            }
+        }
+    }
+
+    private void reduceSizeOnDisk(Connection conn)
+    {
+        CallableStatement cs = null;
+        PreparedStatement stmt = null;
+        try
+        {
+            String tableQuery =
+                    "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+            stmt = conn.prepareStatement(tableQuery);
+            ResultSet rs = null;
+
+            List<String> schemas = new ArrayList<String>();
+            List<String> tables = new ArrayList<String>();
+
+            try
+            {
+                rs = stmt.executeQuery();
+                while(rs.next())
+                {
+                    schemas.add(rs.getString(1));
+                    tables.add(rs.getString(2));
+                }
+            }
+            finally
+            {
+                if(rs != null)
+                {
+                    rs.close();
+                }
+            }
+
+
+            cs = conn.prepareCall
+                    ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+            for(int i = 0; i < schemas.size(); i++)
+            {
+                cs.setString(1, schemas.get(i));
+                cs.setString(2, tables.get(i));
+                cs.setShort(3, (short) 0);
+                cs.execute();
+            }
+        }
+        catch (SQLException e)
+        {
+            closeConnection(conn);
+            throw new RuntimeException("Error reducing on disk size", e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+            closePreparedStatement(cs);
+        }
+
+    }
+
+    private long getSizeOnDisk(Connection conn)
+    {
+        PreparedStatement stmt = null;
+        try
+        {
+            String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+                    "    FROM " +
+                    "        SYS.SYSTABLES systabs," +
+                    "        TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+                    "    WHERE systabs.tabletype = 'T'";
+
+            stmt = conn.prepareStatement(sizeQuery);
+
+            ResultSet rs = null;
+            long size = 0l;
+
+            try
+            {
+                rs = stmt.executeQuery();
+                while(rs.next())
+                {
+                    size = rs.getLong(1);
+                }
+            }
+            finally
+            {
+                if(rs != null)
+                {
+                    rs.close();
+                }
+            }
+
+            return size;
+
+        }
+        catch (SQLException e)
+        {
+            closeConnection(conn);
+            throw new RuntimeException("Error establishing on disk size", e);
+        }
+        finally
+        {
+            closePreparedStatement(stmt);
+        }
+
+    }
+
+
+    private long getPersistentSizeLowThreshold()
+    {
+        return _persistentSizeLowThreshold;
+    }
+
+    private long getPersistentSizeHighThreshold()
+    {
+        return _persistentSizeHighThreshold;
+    }
+
+    @Override
+    public String getStoreType()
+    {
+        return DERBY_STORE_TYPE;
+    }
+
 }
\ No newline at end of file

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Fri Aug  3 12:13:32 2012
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.Iterator;
@@ -102,7 +102,7 @@ public class AssignedSubscriptionMessage
         return visitor.getEntry();
     }
 
-    private class EntryFinder implements AMQQueue.Visitor
+    private class EntryFinder implements QueueEntryVisitor
     {
         private QueueEntry _entry;
         private Subscription _sub;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Fri Aug  3 12:13:32 2012
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.server.subscription;
 
+import org.apache.qpid.server.queue.QueueEntryVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
 import java.util.HashMap;
@@ -176,7 +176,7 @@ public class DefinedGroupMessageGroupMan
         return visitor.getEntry();
     }
 
-    private class EntryFinder implements AMQQueue.Visitor
+    private class EntryFinder implements QueueEntryVisitor
     {
         private QueueEntry _entry;
         private Subscription _sub;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java Fri Aug  3 12:13:32 2012
@@ -45,7 +45,7 @@ class ExplicitAcceptDispositionChangeLis
         final Subscription_0_10 subscription = getSubscription();
         if(subscription != null && _entry.isAcquiredBy(_sub))
         {
-            subscription.getSession().acknowledge(subscription, _entry);
+            subscription.getSessionModel().acknowledge(subscription, _entry);
         }
         else
         {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Fri Aug  3 12:13:32 2012
@@ -72,6 +72,10 @@ class ImplicitAcceptDispositionChangeLis
     public boolean acquire()
     {
         boolean acquired = _entry.acquire(getSubscription());
+        if(acquired)
+        {
+            getSubscription().recordUnacknowledged(_entry);
+        }
         return acquired;
 
     }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Fri Aug  3 12:13:32 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.subscript
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
@@ -32,6 +33,14 @@ public interface Subscription
 
     boolean isTransient();
 
+    long getBytesOut();
+
+    long getMessagesOut();
+
+    long getUnacknowledgedBytes();
+
+    long getUnacknowledgedMessages();
+
     public static enum State
     {
         ACTIVE,
@@ -45,6 +54,7 @@ public interface Subscription
     }
 
     AMQQueue getQueue();
+    AMQSessionModel getSessionModel();
 
     QueueEntry.SubscriptionAcquiredState getOwningState();
     QueueEntry.SubscriptionAssignedState getAssignedState();
@@ -108,4 +118,6 @@ public interface Subscription
     boolean isSessionTransactional();
 
     void queueEmpty() throws AMQException;
+
+    String getConsumerName();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Aug  3 12:13:32 2012
@@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.su
 import org.apache.qpid.server.message.AMQMessage;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
@@ -75,7 +76,7 @@ public abstract class SubscriptionImpl i
 
 
     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
 
     private final ClientDeliveryMethod _deliveryMethod;
     private final RecordDeliveryMethod _recordMethod;
@@ -90,10 +91,15 @@ public abstract class SubscriptionImpl i
     private final long _subscriptionID;
     private LogSubject _logSubject;
     private LogActor _logActor;
-    private UUID _id;
+    private UUID _qmfId;
     private final AtomicLong _deliveredCount = new AtomicLong(0);
-    private long _createTime = System.currentTimeMillis();
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
 
+    private long _createTime = System.currentTimeMillis();
+    
 
     static final class BrowserSubscription extends SubscriptionImpl
     {
@@ -276,22 +282,13 @@ public abstract class SubscriptionImpl i
         public void send(QueueEntry entry, boolean batch) throws AMQException
         {
 
-            // if we do not need to wait for client acknowledgements
-            // we can decrement the reference count immediately.
-
-            // By doing this _before_ the send we ensure that it
-            // doesn't get sent if it can't be dequeued, preventing
-            // duplicate delivery on recovery.
-
-            // The send may of course still fail, in which case, as
-            // the message is unacked, it will be lost.
-
+            
             synchronized (getChannel())
             {
                 getChannel().getProtocolSession().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
-
+                addUnacknowledgedMessage(entry);
                 recordMessageDelivery(entry, deliveryTag);
                 sendToClient(entry, deliveryTag);
 
@@ -371,6 +368,11 @@ public abstract class SubscriptionImpl i
 
     }
 
+    public AMQSessionModel getSessionModel()
+    {
+        return _channel;
+    }
+
     public ConfigStore getConfigStore()
     {
         return getQueue().getConfigStore();
@@ -389,7 +391,7 @@ public abstract class SubscriptionImpl i
         }
         _queue = queue;
 
-        _id = getConfigStore().createId();
+        _qmfId = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
 
         _logSubject = new SubscriptionLogSubject(this);
@@ -468,11 +470,6 @@ public abstract class SubscriptionImpl i
         _deleted.set(true);
     }
 
-    public boolean filtersMessages()
-    {
-        return _filters != null || _noLocal;
-    }
-
     public boolean hasInterest(QueueEntry entry)
     {
         //check that the message hasn't been rejected
@@ -508,13 +505,6 @@ public abstract class SubscriptionImpl i
 
     }
 
-    private String id = String.valueOf(System.identityHashCode(this));
-
-    private String debugIdentity()
-    {
-        return id;
-    }
-
     private boolean checkFilters(QueueEntry msg)
     {
         return (_filters == null) || _filters.allAllow(msg);
@@ -599,6 +589,11 @@ public abstract class SubscriptionImpl i
         return _consumerTag;
     }
 
+    public String getConsumerName()
+    {
+        return _consumerTag == null ? null : _consumerTag.asString();
+    }
+    
     public long getSubscriptionID()
     {
         return _subscriptionID;
@@ -687,6 +682,7 @@ public abstract class SubscriptionImpl i
     {
         _deliveryMethod.deliverToClient(this,entry,deliveryTag);
         _deliveredCount.incrementAndGet();
+        _deliveredBytes.addAndGet(entry.getSize());
     }
 
 
@@ -771,9 +767,10 @@ public abstract class SubscriptionImpl i
         return true;
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public boolean isDurable()
@@ -832,4 +829,44 @@ public abstract class SubscriptionImpl i
 
         _channel.getProtocolSession().flushBatched();
     }
+
+    public long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+
+    protected void addUnacknowledgedMessage(QueueEntry entry)
+    {
+        final long size = entry.getSize();
+        _unacknowledgedBytes.addAndGet(size);
+        _unacknowledgedCount.incrementAndGet();
+        entry.addStateChangeListener(new QueueEntry.StateChangeListener()
+        {
+            public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+            {
+                if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+                {
+                    _unacknowledgedBytes.addAndGet(-size);
+                    _unacknowledgedCount.decrementAndGet();
+                    entry.removeStateChangeListener(this);
+                }
+            }
+        });
+    }
+    
+    public long getUnacknowledgedBytes()
+    {
+        return _unacknowledgedBytes.longValue();
+    }
+
+    public long getUnacknowledgedMessages()
+    {
+        return _unacknowledgedCount.longValue();
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Fri Aug  3 12:13:32 2012
@@ -98,7 +98,7 @@ public class Subscription_0_10 implement
     private final Lock _stateChangeLock = new ReentrantLock();
 
     private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
 
 
@@ -125,11 +125,15 @@ public class Subscription_0_10 implement
 
     private LogActor _logActor;
     private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
-    private UUID _id;
+    private UUID _qmfId;
     private String _traceExclude;
     private String _trace;
     private final long _createTime = System.currentTimeMillis();
     private final AtomicLong _deliveredCount = new AtomicLong(0);
+    private final AtomicLong _deliveredBytes = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+    private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
     private final Map<String, Object> _arguments;
     private int _deferredMessageCredit;
     private long _deferredSizeCredit;
@@ -185,10 +189,10 @@ public class Subscription_0_10 implement
         }
         _queue = queue;
 
-        Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+        Map<String, Object> arguments = queue.getArguments();
         _traceExclude = (String) arguments.get("qpid.trace.exclude");
         _trace = (String) arguments.get("qpid.trace.id");
-        _id = getConfigStore().createId();
+        _qmfId = getConfigStore().createId();
         getConfigStore().addConfiguredObject(this);
         String filterLogString = null;
 
@@ -199,9 +203,13 @@ public class Subscription_0_10 implement
             CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
                     filterLogString.length() > 0));
         }
-
     }
 
+    public String getConsumerName()
+    {
+        return _destination;
+    }
+    
     public boolean isSuspended()
     {
         return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
@@ -620,10 +628,15 @@ public class Subscription_0_10 implement
             _session.sendMessage(xfr, _postIdSettingAction);
             entry.incrementDeliveryCount();
             _deliveredCount.incrementAndGet();
+            _deliveredBytes.addAndGet(entry.getSize());
             if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
             {
                 forceDequeue(entry, false);
             }
+            else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+            {
+                recordUnacknowledged(entry);
+            }
         }
         else
         {
@@ -632,6 +645,12 @@ public class Subscription_0_10 implement
         }
     }
 
+    void recordUnacknowledged(QueueEntry entry)
+    {
+        _unacknowledgedCount.incrementAndGet();
+        _unacknowledgedBytes.addAndGet(entry.getSize());
+    }
+
     private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
     {
         _deferredMessageCredit += deferredMessageCredit;
@@ -653,7 +672,7 @@ public class Subscription_0_10 implement
 
     private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
     {
-        AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore()); 
+        AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
         dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
                            new ServerTransaction.Action()
                            {
@@ -690,7 +709,7 @@ public class Subscription_0_10 implement
             entry.setRedelivered();
         }
 
-        if (getSession().isClosing() || !setRedelivered)
+        if (getSessionModel().isClosing() || !setRedelivered)
         {
             entry.decrementDeliveryCount();
         }
@@ -918,6 +937,8 @@ public class Subscription_0_10 implement
         // TODO Fix Store Context / cleanup
         if(entry.isAcquiredBy(this))
         {
+            _unacknowledgedBytes.addAndGet(-entry.getSize());
+            _unacknowledgedCount.decrementAndGet();
             entry.discard();
         }
     }
@@ -944,7 +965,7 @@ public class Subscription_0_10 implement
         return false;
     }
 
-    ServerSession getSession()
+    public ServerSession getSessionModel()
     {
         return _session;
     }
@@ -952,7 +973,7 @@ public class Subscription_0_10 implement
 
     public SessionConfig getSessionConfig()
     {
-        return getSession();
+        return getSessionModel();
     }
 
     public boolean isBrowsing()
@@ -990,9 +1011,10 @@ public class Subscription_0_10 implement
         return _flowMode.toString();
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public String getName()
@@ -1073,4 +1095,24 @@ public class Subscription_0_10 implement
     {
         _session.getConnection().flush();
     }
+
+    public long getBytesOut()
+    {
+        return _deliveredBytes.longValue();
+    }
+
+    public long getMessagesOut()
+    {
+        return _deliveredCount.longValue();
+    }
+
+    public long getUnacknowledgedBytes()
+    {
+        return _unacknowledgedBytes.longValue();
+    }
+
+    public long getUnacknowledgedMessages()
+    {
+        return _unacknowledgedCount.longValue();
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Fri Aug  3 12:13:32 2012
@@ -20,25 +20,62 @@
  */
 package org.apache.qpid.server.transport;
 
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
 import org.apache.qpid.transport.network.NetworkTransport;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 public class QpidAcceptor
 {
-    private NetworkTransport _transport;
-    private String _protocol;
-    public QpidAcceptor(NetworkTransport transport, String protocol)
+    public enum Transport
     {
-        _transport = transport;
-        _protocol = protocol;
+        TCP("TCP"),
+        SSL("TCP/SSL");
+
+        private final String _asString;
+
+        Transport(String asString)
+        {
+            _asString = asString;
+        }
+
+        public String toString()
+        {
+            return _asString;
+        }
+    }
+
+    private NetworkTransport _networkTransport;
+    private Transport _transport;
+    private Set<AmqpProtocolVersion> _supported;
+
+
+    public QpidAcceptor(NetworkTransport transport, Transport protocol, Set<AmqpProtocolVersion> supported)
+    {
+        _networkTransport = transport;
+        _transport = protocol;
+        _supported = Collections.unmodifiableSet(new HashSet<AmqpProtocolVersion>(supported));
     }
 
     public NetworkTransport getNetworkTransport()
     {
+        return _networkTransport;
+    }
+
+    public Transport getTransport()
+    {
         return _transport;
     }
 
+    public Set<AmqpProtocolVersion> getSupported()
+    {
+        return _supported;
+    }
+
     public String toString()
     {
-        return _protocol;
+        return _transport.toString();
     }    
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,14 @@
  */
 package org.apache.qpid.server.transport;
 
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -28,12 +36,9 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.GenericActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Connection;
@@ -48,17 +53,7 @@ import static org.apache.qpid.server.log
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
 
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
@@ -67,24 +62,19 @@ public class ServerConnection extends Co
 
     private Subject _authorizedSubject = null;
     private Principal _authorizedPrincipal = null;
-    private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
     private final long _connectionId;
     private final Object _reference = new Object();
-    private ServerConnectionMBean _mBean;
     private VirtualHost _virtualHost;
     private AtomicLong _lastIoTime = new AtomicLong();
-    
+    private boolean _blocking;
+    private Principal _peerPrincipal;
+
     public ServerConnection(final long connectionId)
     {
         _connectionId = connectionId;
     }
 
-    public UUID getId()
-    {
-        return _config.getId();
-    }
-
     public Object getReference()
     {
         return _reference;
@@ -100,12 +90,12 @@ public class ServerConnection extends Co
     protected void setState(State state)
     {
         super.setState(state);
-        
+
         if (state == State.OPEN)
         {
             if (_onOpenTask != null)
             {
-                _onOpenTask.run();    
+                _onOpenTask.run();
             }
             _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true));
 
@@ -118,7 +108,6 @@ public class ServerConnection extends Co
             {
                 _virtualHost.getConnectionRegistry().deregisterConnection(this);
             }
-            unregisterConnectionMbean();
         }
 
         if (state == State.CLOSED)
@@ -156,8 +145,6 @@ public class ServerConnection extends Co
         _virtualHost = virtualHost;
 
         initialiseStatistics();
-
-        registerConnectionMbean();
     }
 
     public void setConnectionConfig(final ConnectionConfig config)
@@ -193,7 +180,7 @@ public class ServerConnection extends Co
 
         ((ServerSession)session).close();
     }
-    
+
     public LogSubject getLogSubject()
     {
         return (LogSubject) this;
@@ -273,7 +260,6 @@ public class ServerConnection extends Co
     public void close(AMQConstant cause, String message) throws AMQException
     {
         closeSubscriptions();
-        unregisterConnectionMbean();
         ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
         try
         {
@@ -286,6 +272,46 @@ public class ServerConnection extends Co
         close(replyCode, message);
     }
 
+    public synchronized void block()
+    {
+        if(!_blocking)
+        {
+            _blocking = true;
+            for(AMQSessionModel ssn : getSessionModels())
+            {
+                ssn.block();
+            }
+        }
+    }
+
+    public synchronized void unblock()
+    {
+        if(_blocking)
+        {
+            _blocking = false;
+            for(AMQSessionModel ssn : getSessionModels())
+            {
+                ssn.unblock();
+            }
+        }
+    }
+
+    @Override
+    public synchronized void registerSession(final Session ssn)
+    {
+        super.registerSession(ssn);
+        if(_blocking)
+        {
+            ((ServerSession)ssn).block();
+        }
+    }
+
+    @Override
+    public synchronized void removeSession(final Session ssn)
+    {
+        super.removeSession(ssn);
+    }
+
     public List<AMQSessionModel> getSessionModels()
     {
         List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
@@ -298,44 +324,38 @@ public class ServerConnection extends Co
 
     public void registerMessageDelivered(long messageSize)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
+        _messagesDelivered.registerEvent(1L);
+        _dataDelivered.registerEvent(messageSize);
         _virtualHost.registerMessageDelivered(messageSize);
     }
 
     public void registerMessageReceived(long messageSize, long timestamp)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
+        _messagesReceived.registerEvent(1L, timestamp);
+        _dataReceived.registerEvent(messageSize, timestamp);
         _virtualHost.registerMessageReceived(messageSize, timestamp);
     }
-    
+
     public StatisticsCounter getMessageReceiptStatistics()
     {
         return _messagesReceived;
     }
-    
+
     public StatisticsCounter getDataReceiptStatistics()
     {
         return _dataReceived;
     }
-    
+
     public StatisticsCounter getMessageDeliveryStatistics()
     {
         return _messagesDelivered;
     }
-    
+
     public StatisticsCounter getDataDeliveryStatistics()
     {
         return _dataDelivered;
     }
-    
+
     public void resetStatistics()
     {
         _messagesDelivered.reset();
@@ -346,25 +366,12 @@ public class ServerConnection extends Co
 
     public void initialiseStatistics()
     {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
-        
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
         _dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
         _messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
         _dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
     }
 
-    public boolean isStatisticsEnabled()
-    {
-        return _statisticsEnabled;
-    }
-
-    public void setStatisticsEnabled(boolean enabled)
-    {
-        _statisticsEnabled = enabled;
-    }
-
     /**
      * @return authorizedSubject
      */
@@ -389,7 +396,7 @@ public class ServerConnection extends Co
         else
         {
             _authorizedSubject = authorizedSubject;
-            _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+            _authorizedPrincipal = authorizedSubject.getPrincipals().iterator().next();
         }
     }
 
@@ -408,6 +415,11 @@ public class ServerConnection extends Co
         return !super.hasSessionWithName(name);
     }
 
+    public String getRemoteAddressString()
+    {
+        return getConfig().getAddress();
+    }
+
     public String getUserName()
     {
         return _authorizedPrincipal.getName();
@@ -436,12 +448,6 @@ public class ServerConnection extends Co
         }
     }
 
-
-    public ManagedObject getManagedObject()
-    {
-        return _mBean;
-    }
-
     @Override
     public void send(ProtocolEvent event)
     {
@@ -449,53 +455,38 @@ public class ServerConnection extends Co
         super.send(event);
     }
 
-    public AtomicLong getLastIoTime()
+    public long getLastIoTime()
     {
-        return _lastIoTime;
+        return _lastIoTime.longValue();
     }
 
-    void checkForNotification()
+    public String getClientId()
     {
-        int channelsCount = getSessionModels().size();
-        if (_mBean != null && channelsCount >= getConnectionDelegate().getChannelMax())
-        {
-            _mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
-        }
+        return getConnectionDelegate().getClientId();
     }
 
-    private void registerConnectionMbean()
+    public String getClientVersion()
     {
-        try
-        {
-            _mBean = new ServerConnectionMBean(this);
-            _mBean.register();
-        }
-        catch (JMException jme)
-        {
-            log.error("Unable to register mBean for ServerConnection", jme);
-        }
+        return getConnectionDelegate().getClientVersion();
     }
 
-    private void unregisterConnectionMbean()
+    public String getPrincipalAsString()
     {
-        if (_mBean != null)
-        {
-            if (log.isDebugEnabled())
-            {
-                log.debug("Unregistering mBean for ServerConnection" + _mBean);
-            }
-            _mBean.unregister();
-            _mBean = null;
-        }
+        return getAuthorizedPrincipal().getName();
     }
 
-    public String getClientId()
+    public long getSessionCountLimit()
     {
-        return getConnectionDelegate().getClientId();
+        return getChannelMax();
     }
 
-    public String getClientVersion()
+    public Principal getPeerPrincipal()
     {
-        return getConnectionDelegate().getClientVersion();
+        return _peerPrincipal;
+    }
+
+    public void setPeerPrincipal(Principal peerPrincipal)
+    {
+        _peerPrincipal = peerPrincipal;
     }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Aug  3 12:13:32 2012
@@ -20,18 +20,26 @@
  */
 package org.apache.qpid.server.transport;
 
-import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
-
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerConfig;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -39,16 +47,7 @@ import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
 
 public class ServerConnectionDelegate extends ServerDelegate
 {
@@ -58,22 +57,25 @@ public class ServerConnectionDelegate ex
     private final IApplicationRegistry _appRegistry;
     private int _maxNoOfChannels;
     private Map<String,Object> _clientProperties;
+    private final AuthenticationManager _authManager;
 
-    public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
+    public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager)
     {
-        this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+        this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager);
     }
 
-    public ServerConnectionDelegate(Map<String, Object> properties,
+    private ServerConnectionDelegate(Map<String, Object> properties,
                                     List<Object> locales,
                                     IApplicationRegistry appRegistry,
-                                    String localFQDN)
+                                    String localFQDN,
+                                    AuthenticationManager authManager)
     {
-        super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
-        
+        super(properties, parseToList(authManager.getMechanisms()), locales);
+
         _appRegistry = appRegistry;
         _localFQDN = localFQDN;
-        _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+        _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount();
+        _authManager = authManager;
     }
 
     private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
@@ -108,18 +110,17 @@ public class ServerConnectionDelegate ex
         return ssn;
     }
 
-    protected SaslServer createSaslServer(String mechanism) throws SaslException
+    protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
     {
-        return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+        return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
 
     }
 
     protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
     {
-        final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
         final ServerConnection sconn = (ServerConnection) conn;
-        
-        
+        final AuthenticationResult authResult = _authManager.authenticate(ss, response);
+
         if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
         {
             tuneAuthorizedConnection(sconn);
@@ -168,7 +169,7 @@ public class ServerConnectionDelegate ex
         vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
 
         SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
-        
+
         if(vhost != null)
         {
             sconn.setVirtualHost(vhost);
@@ -194,7 +195,7 @@ public class ServerConnectionDelegate ex
             sconn.setState(Connection.State.CLOSING);
             sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
         }
-        
+
     }
 
     @Override
@@ -216,7 +217,7 @@ public class ServerConnectionDelegate ex
 
         setConnectionTuneOkChannelMax(sconn, okChannelMax);
     }
-    
+
     @Override
     protected int getHeartbeatMax()
     {
@@ -225,7 +226,7 @@ public class ServerConnectionDelegate ex
     }
 
     @Override
-    protected int getChannelMax()
+    public int getChannelMax()
     {
         return _maxNoOfChannels;
     }
@@ -265,7 +266,6 @@ public class ServerConnectionDelegate ex
         if(isSessionNameUnique(atc.getName(), conn))
         {
             super.sessionAttach(conn, atc);
-            ((ServerConnection)conn).checkForNotification();
         }
         else
         {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org