You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [15/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Aug  3 12:13:32 2012
@@ -31,24 +31,21 @@ import java.util.UUID;
 public interface AMQConnectionModel extends StatisticsGatherer
 {
     /**
-     * get a unique id for this connection.
-     * 
-     * @return a {@link UUID} representing the connection
-     */
-    public UUID getId();
-    
-    /**
      * Close the underlying Connection
-     * 
+     *
      * @param cause
      * @param message
      * @throws org.apache.qpid.AMQException
      */
     public void close(AMQConstant cause, String message) throws AMQException;
 
+    public void block();
+
+    public void unblock();
+
     /**
      * Close the given requested Session
-     * 
+     *
      * @param session
      * @param cause
      * @param message
@@ -57,10 +54,10 @@ public interface AMQConnectionModel exte
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
 
     public long getConnectionId();
-    
+
     /**
      * Get a list of all sessions using this connection.
-     * 
+     *
      * @return a list of {@link AMQSessionModel}s
      */
     public List<AMQSessionModel> getSessionModels();
@@ -73,4 +70,16 @@ public interface AMQConnectionModel exte
     public String getUserName();
 
     public boolean isSessionNameUnique(byte[] name);
+
+    String getRemoteAddressString();
+
+    String getClientId();
+
+    String getClientVersion();
+
+    String getPrincipalAsString();
+
+    long getSessionCountLimit();
+
+    long getLastIoTime();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Aug  3 12:13:32 2012
@@ -20,8 +20,26 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
@@ -46,13 +64,10 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.actors.ManagementActor;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
@@ -66,25 +81,7 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
 
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -109,8 +106,6 @@ public class AMQProtocolEngine implement
 
     private AMQCodecFactory _codecFactory;
 
-    private AMQProtocolSessionMBean _managedObject;
-
     private SaslServer _saslServer;
 
     private Object _lastReceived;
@@ -147,12 +142,10 @@ public class AMQProtocolEngine implement
 
     private long _maxFrameSize;
     private final AtomicBoolean _closing = new AtomicBoolean(false);
-    private final UUID _id;
+    private final UUID _qmfId;
     private final ConfigStore _configStore;
     private long _createTime = System.currentTimeMillis();
 
-    private ApplicationRegistry _registry;
-    private boolean _statisticsEnabled = false;
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
     private NetworkConnection _network;
@@ -160,14 +153,13 @@ public class AMQProtocolEngine implement
 
     private volatile boolean _deferFlush;
     private long _lastReceivedTime;
+    private boolean _blocking;
 
-    public ManagedObject getManagedObject()
-    {
-        return _managedObject;
-    }
+    private final Lock _receivedLock;
 
     public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
     {
+        _receivedLock = new ReentrantLock();
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _codecFactory = new AMQCodecFactory(true, this);
 
@@ -179,12 +171,12 @@ public class AMQProtocolEngine implement
         _logSubject = new ConnectionLogSubject(this);
 
         _configStore = virtualHostRegistry.getConfigStore();
-        _id = _configStore.createId();
+        _qmfId = _configStore.createId();
 
         _actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
 
-        _registry = virtualHostRegistry.getApplicationRegistry();
         initialiseStatistics();
+
     }
 
     public void setNetworkConnection(NetworkConnection network)
@@ -198,11 +190,6 @@ public class AMQProtocolEngine implement
         _sender = sender;
     }
 
-    private AMQProtocolSessionMBean createMBean() throws JMException
-    {
-        return new AMQProtocolSessionMBean(this);
-    }
-
     public long getSessionID()
     {
         return _connectionID;
@@ -244,6 +231,8 @@ public class AMQProtocolEngine implement
         final long arrivalTime = System.currentTimeMillis();
         _lastReceivedTime = arrivalTime;
         _lastIoTime = arrivalTime;
+
+        _receivedLock.lock();
         try
         {
             final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -268,6 +257,10 @@ public class AMQProtocolEngine implement
             _logger.error("Unexpected exception when processing datablock", e);
             closeProtocolSession();
         }
+        finally
+        {
+            _receivedLock.unlock();
+        }
     }
 
     private void receiveComplete()
@@ -374,7 +367,7 @@ public class AMQProtocolEngine implement
             // This sets the protocol version (and hence framing classes) for this session.
             setProtocolVersion(pv);
 
-            String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+            String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms();
 
             String locales = "en_US";
 
@@ -576,7 +569,10 @@ public class AMQProtocolEngine implement
 
     public List<AMQChannel> getChannels()
     {
-        return new ArrayList<AMQChannel>(_channelMap.values());
+        synchronized (_channelMap)
+        {
+            return new ArrayList<AMQChannel>(_channelMap.values());
+        }
     }
 
     public AMQChannel getAndAssertChannel(int channelId) throws AMQException
@@ -633,24 +629,21 @@ public class AMQProtocolEngine implement
         }
         else
         {
-            _channelMap.put(channel.getChannelId(), channel);
+            synchronized (_channelMap)
+            {
+                _channelMap.put(channel.getChannelId(), channel);
+
+                if(_blocking)
+                {
+                    channel.block();
+                }
+            }
         }
 
         if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
         {
             _cachedChannels[channelId] = channel;
         }
-
-        checkForNotification();
-    }
-
-    private void checkForNotification()
-    {
-        int channelsCount = _channelMap.size();
-        if (_managedObject != null && channelsCount >= _maxNoOfChannels)
-        {
-            _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
-        }
     }
 
     public Long getMaximumNumberOfChannels()
@@ -735,10 +728,14 @@ public class AMQProtocolEngine implement
      */
     public void removeChannel(int channelId)
     {
-        _channelMap.remove(channelId);
-        if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+        synchronized (_channelMap)
         {
-            _cachedChannels[channelId] = null;
+            _channelMap.remove(channelId);
+
+            if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+            {
+                _cachedChannels[channelId] = null;
+            }
         }
     }
 
@@ -763,12 +760,14 @@ public class AMQProtocolEngine implement
      */
     private void closeAllChannels() throws AMQException
     {
-        for (AMQChannel channel : _channelMap.values())
+        for (AMQChannel channel : getChannels())
         {
             channel.close();
         }
-
-        _channelMap.clear();
+        synchronized (_channelMap)
+        {
+            _channelMap.clear();
+        }
         for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
         {
             _cachedChannels[i] = null;
@@ -780,6 +779,9 @@ public class AMQProtocolEngine implement
     {
         if(_closing.compareAndSet(false,true))
         {
+            // force sync of outstanding async work
+            receiveComplete();
+
             // REMOVE THIS SHOULD NOT BE HERE.
             if (CurrentActor.get() == null)
             {
@@ -796,13 +798,6 @@ public class AMQProtocolEngine implement
 
                 getConfigStore().removeConfiguredObject(this);
 
-                if (_managedObject != null)
-                {
-                    _managedObject.unregister();
-                    // Ensure we only do this once.
-                    _managedObject = null;
-                }
-
                 for (Task task : _taskList)
                 {
                     task.doTask(this);
@@ -835,7 +830,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+    private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
     {
         try
         {
@@ -846,12 +841,18 @@ public class AMQProtocolEngine implement
 
             markChannelAwaitingCloseOk(channelId);
             closeSession();
-            _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-            writeFrame(e.getCloseFrame(channelId));
         }
         finally
         {
-            closeProtocolSession();
+            try
+            {
+                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+                writeFrame(e.getCloseFrame(channelId));
+            }
+            finally
+            {
+                closeProtocolSession();
+            }
         }
 
 
@@ -983,16 +984,6 @@ public class AMQProtocolEngine implement
         _virtualHost.getConnectionRegistry().registerConnection(this);
 
         _configStore.addConfiguredObject(this);
-
-        try
-        {
-            _managedObject = createMBean();
-            _managedObject.register();
-        }
-        catch (JMException e)
-        {
-            _logger.error(e);
-        }
     }
 
     public void addSessionCloseTask(Task task)
@@ -1026,7 +1017,7 @@ public class AMQProtocolEngine implement
 
     public Principal getAuthorizedPrincipal()
     {
-        return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+        return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next();
     }
 
     public SocketAddress getRemoteAddress()
@@ -1039,6 +1030,11 @@ public class AMQProtocolEngine implement
         return _network.getLocalAddress();
     }
 
+    public Principal getPeerPrincipal()
+    {
+        return _network.getPeerPrincipal();
+    }
+
     public MethodRegistry getMethodRegistry()
     {
         return _methodRegistry;
@@ -1144,6 +1140,16 @@ public class AMQProtocolEngine implement
         return _clientVersion;
     }
 
+    public String getPrincipalAsString()
+    {
+        return getAuthId();
+    }
+
+    public long getSessionCountLimit()
+    {
+        return getMaximumNumberOfChannels();
+    }
+
     public Boolean isIncoming()
     {
         return true;
@@ -1199,9 +1205,10 @@ public class AMQProtocolEngine implement
         return false;
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public long getConnectionId()
@@ -1337,6 +1344,36 @@ public class AMQProtocolEngine implement
 		                (Throwable) null));
     }
 
+    public void block()
+    {
+        synchronized (_channelMap)
+        {
+            if(!_blocking)
+            {
+                _blocking = true;
+                for(AMQChannel channel : _channelMap.values())
+                {
+                    channel.block();
+                }
+            }
+        }
+    }
+
+    public void unblock()
+    {
+        synchronized (_channelMap)
+        {
+            if(_blocking)
+            {
+                _blocking = false;
+                for(AMQChannel channel : _channelMap.values())
+                {
+                    channel.unblock();
+                }
+            }
+        }
+    }
+
     public boolean isClosed()
     {
         return _closed;
@@ -1344,12 +1381,7 @@ public class AMQProtocolEngine implement
 
     public List<AMQSessionModel> getSessionModels()
     {
-		List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
-		for (AMQChannel channel : getChannels())
-		{
-		    sessions.add((AMQSessionModel) channel);
-		}
-		return sessions;
+		return new ArrayList<AMQSessionModel>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -1359,21 +1391,15 @@ public class AMQProtocolEngine implement
 
     public void registerMessageDelivered(long messageSize)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesDelivered.registerEvent(1L);
-            _dataDelivered.registerEvent(messageSize);
-        }
+        _messagesDelivered.registerEvent(1L);
+        _dataDelivered.registerEvent(messageSize);
         _virtualHost.registerMessageDelivered(messageSize);
     }
 
     public void registerMessageReceived(long messageSize, long timestamp)
     {
-        if (isStatisticsEnabled())
-        {
-            _messagesReceived.registerEvent(1L, timestamp);
-            _dataReceived.registerEvent(messageSize, timestamp);
-        }
+        _messagesReceived.registerEvent(1L, timestamp);
+        _dataReceived.registerEvent(messageSize, timestamp);
         _virtualHost.registerMessageReceived(messageSize, timestamp);
     }
 
@@ -1407,29 +1433,26 @@ public class AMQProtocolEngine implement
 
     public void initialiseStatistics()
     {
-        setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
-                _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
         _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
         _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
         _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
         _dataReceived = new StatisticsCounter("data-received-" + getSessionID());
     }
 
-    public boolean isStatisticsEnabled()
+    public boolean isSessionNameUnique(byte[] name)
     {
-        return _statisticsEnabled;
+        // 0-8/0-9/0-9-1 sessions don't have names
+        return true;
     }
 
-    public void setStatisticsEnabled(boolean enabled)
+    public String getRemoteAddressString()
     {
-        _statisticsEnabled = enabled;
+        return String.valueOf(getRemoteAddress());
     }
 
-    public boolean isSessionNameUnique(byte[] name)
+    public String getClientId()
     {
-        // 0-8/0-9/0-9-1 sessions don't have names
-        return true;
+        return String.valueOf(getContextKey());
     }
 
     public void setDeferFlush(boolean deferFlush)
@@ -1466,4 +1489,9 @@ public class AMQProtocolEngine implement
     {
         return _reference;
     }
+
+    public Lock getReceivedLock()
+    {
+        return _receivedLock;
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Aug  3 12:13:32 2012
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.net.SocketAddress;
+import java.security.Principal;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
 
 import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -60,6 +62,13 @@ public interface AMQProtocolSession exte
 
     long getLastReceivedTime();
 
+    /**
+     * Return the local socket address for the connection
+     *
+     * @return the socket address
+     */
+    SocketAddress getLocalAddress();
+
     public static interface Task
     {
         public void doTask(AMQProtocolSession session) throws AMQException;
@@ -145,10 +154,6 @@ public interface AMQProtocolSession exte
 
     void closeProtocolSession();
 
-    /** This must be called to close the session in order to free up any resources managed by the session. */
-    void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
-
-
     /** @return a key that uniquely identifies this session */
     Object getKey();
 
@@ -210,4 +215,7 @@ public interface AMQProtocolSession exte
 
     void mgmtCloseChannel(int channelId);
 
+    public Principal getPeerPrincipal();
+
+    Lock getReceivedLock();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Aug  3 12:13:32 2012
@@ -37,26 +37,26 @@ import org.apache.qpid.server.queue.Simp
 public interface AMQSessionModel extends Comparable<AMQSessionModel>
 {
     /** Unique session ID across entire broker*/
-    public UUID getId();
+    public UUID getQMFId();
 
     public AMQConnectionModel getConnectionModel();
 
     public String getClientID();
-    
+
     public void close() throws AMQException;
 
     public LogSubject getLogSubject();
-    
+
     /**
      * This method is called from the housekeeping thread to check the status of
      * transactions on this session and react appropriately.
-     * 
+     *
      * If a transaction is open for too long or idle for too long then a warning
      * is logged or the connection is closed, depending on the configuration. An open
      * transaction is one that has recent activity. The transaction age is counted
-     * from the time the transaction was started. An idle transaction is one that 
-     * has had no activity, such as publishing or acknowledgeing messages.
-     * 
+     * from the time the transaction was started. An idle transaction is one that
+     * has had no activity, such as publishing or acknowledging messages.
+     *
      * @param openWarn time in milliseconds before alerting on open transaction
      * @param openClose time in milliseconds before closing connection with open transaction
      * @param idleWarn time in milliseconds before alerting on idle transaction
@@ -68,6 +68,22 @@ public interface AMQSessionModel extends
 
     void unblock(AMQQueue queue);
 
+    void block();
+
+    void unblock();
+
+    boolean getBlocking();
 
     boolean onSameConnection(InboundMessage inbound);
+
+    int getUnacknowledgedMessageCount();
+
+    Long getTxnCount();
+    Long getTxnStart();
+    Long getTxnCommits();
+    Long getTxnRejects();
+
+    int getChannelId();
+
+    int getConsumerCount();
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Aug  3 12:13:32 2012
@@ -302,7 +302,7 @@ public class MultiVersionProtocolEngine 
         public ServerProtocolEngine getProtocolEngine()
         {
             final ConnectionDelegate connDelegate =
-                    new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
+                    new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn, _appRegistry.getAuthenticationManager(getLocalAddress()));
 
             ServerConnection conn = new ServerConnection(_id);
             conn.setConnectionDelegate(connDelegate);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Aug  3 12:13:32 2012
@@ -47,7 +47,7 @@ public class ProtocolEngine_0_10  extend
     private long _readBytes;
     private long _writtenBytes;
     private ServerConnection _connection;
-    private final UUID _id;
+    private final UUID _qmfId;
     private final IApplicationRegistry _appRegistry;
     private long _createTime = System.currentTimeMillis();
 
@@ -59,7 +59,7 @@ public class ProtocolEngine_0_10  extend
         _connection = conn;
         _connection.setConnectionConfig(this);
 
-        _id = appRegistry.getConfigStore().createId();
+        _qmfId = appRegistry.getConfigStore().createId();
         _appRegistry = appRegistry;
 
         if(network != null)
@@ -88,7 +88,7 @@ public class ProtocolEngine_0_10  extend
         _network = network;
 
         _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
-
+        _connection.setPeerPrincipal(_network.getPeerPrincipal());
         // FIXME Two log messages to maintain compatibility with earlier protocol versions
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
         _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
@@ -180,9 +180,10 @@ public class ProtocolEngine_0_10  extend
         return _appRegistry.getConfigStore();
     }
 
-    public UUID getId()
+    @Override
+    public UUID getQMFId()
     {
-        return _id;
+        return _qmfId;
     }
 
     public ConnectionConfigType getConfigType()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Fri Aug  3 12:13:32 2012
@@ -26,12 +26,13 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
 import org.apache.qpid.amqp_1_0.framing.FrameHandler;
 import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.Container;
 import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
@@ -95,7 +96,7 @@ public class ProtocolEngine_1_0_0 implem
     }
 
     private State _state = State.A;
-    
+
 
 
     public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
@@ -143,8 +144,9 @@ public class ProtocolEngine_1_0_0 implem
 
         Container container = new Container(_appRegistry.getBrokerId().toString());
 
-        _conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
-        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+        _conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager(
+                getLocalAddress())));
+        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
         _conn.setFrameOutputHandler(this);
         _conn.setRemoteAddress(_network.getRemoteAddress());
 
@@ -155,14 +157,14 @@ public class ProtocolEngine_1_0_0 implem
         _sender.flush();
     }
 
-    private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+    private SaslServerProvider asSaslServerProvider(final AuthenticationManager authenticationManager)
     {
-        return new CallbackHandlerSource()
+        return new SaslServerProvider()
         {
             @Override
-            public CallbackHandler getHandler(String mechanism)
+            public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
             {
-                return authenticationManager.getHandler(mechanism);
+                return authenticationManager.createSaslServer(mechanism, fqdn, null);
             }
         };
     }

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1333988-1368650

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Fri Aug  3 12:13:32 2012
@@ -26,13 +26,14 @@ import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
 import org.apache.qpid.amqp_1_0.codec.FrameWriter;
 import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
 import org.apache.qpid.amqp_1_0.framing.AMQFrame;
 import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
 import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
-import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.Container;
 import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
@@ -57,7 +58,7 @@ public class ProtocolEngine_1_0_0_SASL i
        private long _createTime = System.currentTimeMillis();
        private ConnectionEndpoint _conn;
        private long _connectionId;
-    
+
        private static final ByteBuffer HEADER =
                ByteBuffer.wrap(new byte[]
                        {
@@ -163,9 +164,9 @@ public class ProtocolEngine_1_0_0_SASL i
 
         Container container = new Container(_appRegistry.getBrokerId().toString());
 
-        _conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance()
-                                                                                             .getAuthenticationManager()));
-        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+        _conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
+                .getAuthenticationManager(getLocalAddress())));
+        _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
         _conn.setRemoteAddress(getRemoteAddress());
 
 
@@ -200,14 +201,14 @@ public class ProtocolEngine_1_0_0_SASL i
 
     }
 
-    private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+    private SaslServerProvider asSaslServerProvider(final AuthenticationManager authenticationManager)
     {
-        return new CallbackHandlerSource()
+        return new SaslServerProvider()
         {
             @Override
-            public CallbackHandler getHandler(String mechanism)
+            public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
             {
-                return authenticationManager.getHandler(mechanism);
+                return authenticationManager.createSaslServer(mechanism, fqdn, null);
             }
         };
     }

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1333988-1368650

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:r1333988-1368650

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Aug  3 12:13:32 2012
@@ -20,21 +20,35 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
 import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
 
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
 public class Connection_1_0 implements ConnectionEventListener
 {
 
     private IApplicationRegistry _appRegistry;
     private VirtualHost _vhost;
+    private final ConnectionEndpoint _conn;
+    private final long _connectionId;
+    private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
 
 
     public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements C
 
 
 
-    public Connection_1_0(IApplicationRegistry appRegistry)
+    public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
     {
         _appRegistry = appRegistry;
         _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+        _conn = conn;
+        _connectionId = connectionId;
+        _vhost.getConnectionRegistry().registerConnection(_model);
+
     }
 
     public void remoteSessionCreation(SessionEndpoint endpoint)
     {
         Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+        _sessions.add(session);
         endpoint.setSessionEventListener(session);
     }
 
+    void sessionEnded(Session_1_0 session)
+    {
+        _sessions.remove(session);
+    }
 
     void removeConnectionCloseTask(final Task task)
     {
@@ -86,6 +109,8 @@ public class Connection_1_0 implements C
         {
             _closeTasks.clear();
         }
+        _vhost.getConnectionRegistry().deregisterConnection(_model);
+
 
     }
 
@@ -94,5 +119,174 @@ public class Connection_1_0 implements C
         closeReceived();
     }
 
+    private final AMQConnectionModel _model = new AMQConnectionModel()
+    {
+        private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+        private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+        private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+        private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+        private final LogSubject _logSubject = new LogSubject()
+        {
+            @Override
+            public String toLogString()
+            {
+                return "[" +
+                        MessageFormat.format(CONNECTION_FORMAT,
+                                             getConnectionId(),
+                                             getClientId(),
+                                             getRemoteAddressString(),
+                                             _vhost.getName())
+                     + "] ";
+
+            }
+        };
+
+        @Override
+        public void close(AMQConstant cause, String message) throws AMQException
+        {
+            // TODO
+        }
+
+        @Override
+        public void block()
+        {
+            // TODO
+        }
+
+        @Override
+        public void unblock()
+        {
+            // TODO
+        }
+
+        @Override
+        public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+        {
+            // TODO
+        }
+
+        @Override
+        public long getConnectionId()
+        {
+            return _connectionId;
+        }
+
+        @Override
+        public List<AMQSessionModel> getSessionModels()
+        {
+            return new ArrayList<AMQSessionModel>(_sessions);
+        }
+
+        @Override
+        public LogSubject getLogSubject()
+        {
+            return _logSubject;
+        }
+
+        @Override
+        public String getUserName()
+        {
+            return getPrincipalAsString();
+        }
+
+        @Override
+        public boolean isSessionNameUnique(byte[] name)
+        {
+            return true;  // TODO
+        }
+
+        @Override
+        public String getRemoteAddressString()
+        {
+            return String.valueOf(_conn.getRemoteAddress());
+        }
+
+        @Override
+        public String getClientId()
+        {
+            return _conn.getRemoteContainerId();
+        }
+
+        @Override
+        public String getClientVersion()
+        {
+            return "";  //TODO
+        }
+
+        @Override
+        public String getPrincipalAsString()
+        {
+            return String.valueOf(_conn.getUser());
+        }
+
+        @Override
+        public long getSessionCountLimit()
+        {
+            return 0;  // TODO
+        }
+
+        @Override
+        public long getLastIoTime()
+        {
+            return 0;  // TODO
+        }
+
+        @Override
+        public void initialiseStatistics()
+        {
+            // TODO
+        }
+
+        @Override
+        public void registerMessageReceived(long messageSize, long timestamp)
+        {
+            // TODO
+        }
+
+        @Override
+        public void registerMessageDelivered(long messageSize)
+        {
+            // TODO
+        }
+
+        @Override
+        public StatisticsCounter getMessageDeliveryStatistics()
+        {
+            return _messageDeliveryStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getMessageReceiptStatistics()
+        {
+            return _messageReceiptStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getDataDeliveryStatistics()
+        {
+            return _dataDeliveryStatistics;
+        }
+
+        @Override
+        public StatisticsCounter getDataReceiptStatistics()
+        {
+            return _dataReceiptStatistics;
+        }
+
+        @Override
+        public void resetStatistics()
+        {
+            // TODO
+        }
+
+
+    };
+
+    AMQConnectionModel getModel()
+    {
+        return _model;
+    }
+
 
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java Fri Aug  3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.HashMap;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Fri Aug  3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Fri Aug  3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Aug  3 12:13:32 2012
@@ -200,7 +200,7 @@ public class SendingLink_1_0 implements 
                 if(queue == null)
                 {
                     queue = AMQQueueFactory.createAMQQueueImpl(
-                                UUIDGenerator.generateUUID(),
+                                UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
                                 name,
                                 isDurable,
                                 null,

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.text.MessageFormat;
 import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.*;
 
-public class Session_1_0 implements SessionEventListener
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+
+public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
 {
     private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
     private IApplicationRegistry _appRegistry;
@@ -56,6 +65,7 @@ public class Session_1_0 implements Sess
     private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
             new LinkedHashMap<Integer, ServerTransaction>();
     private final Connection_1_0 _connection;
+    private UUID _id = UUID.randomUUID();
 
 
     public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
@@ -319,7 +329,7 @@ public class Session_1_0 implements Sess
                                             ? null
                                             : (LifetimePolicy) properties.get(LIFETIME_POLICY);
 
-            final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateUUID(),
+            final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
                                                                                    queueName,
                                                                                    false, // durable
                                                                                    null, // owner
@@ -405,6 +415,8 @@ public class Session_1_0 implements Sess
             iter.remove();
         }
 
+        _connection.sessionEnded(this);
+
     }
 
     Integer binaryToInteger(final Binary txnId)
@@ -443,4 +455,153 @@ public class Session_1_0 implements Sess
     public void forceEnd()
     {
     }
+
+    @Override
+    public UUID getQMFId()
+    {
+        return _id;
+    }
+
+    @Override
+    public AMQConnectionModel getConnectionModel()
+    {
+        return _connection.getModel();
+    }
+
+    @Override
+    public String getClientID()
+    {
+        // TODO
+        return "";
+    }
+
+    @Override
+    public void close() throws AMQException
+    {
+        // TODO - required for AMQSessionModel / management initiated closing
+    }
+
+    @Override
+    public LogSubject getLogSubject()
+    {
+        return this;
+    }
+
+    @Override
+    public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+    {
+        // TODO - required for AMQSessionModel / long running transaction detection
+    }
+
+    @Override
+    public void block(AMQQueue queue)
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void unblock(AMQQueue queue)
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void block()
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public void unblock()
+    {
+        // TODO - required for AMQSessionModel / producer side flow control
+    }
+
+    @Override
+    public boolean getBlocking()
+    {
+        // TODO
+        return false;
+    }
+
+    @Override
+    public boolean onSameConnection(InboundMessage inbound)
+    {
+        // TODO
+        return false;
+    }
+
+    @Override
+    public int getUnacknowledgedMessageCount()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public Long getTxnCount()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnStart()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnCommits()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public Long getTxnRejects()
+    {
+        // TODO
+        return 0l;
+    }
+
+    @Override
+    public int getChannelId()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public int getConsumerCount()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public int compareTo(AMQSessionModel o)
+    {
+        return getQMFId().compareTo(o.getQMFId());
+    }
+
+
+
+    public String toLogString()
+    {
+        long connectionId = getConnectionModel().getConnectionId();
+
+        String remoteAddress = getConnectionModel().getRemoteAddressString();
+
+        return "[" +
+               MessageFormat.format(CHANNEL_FORMAT,
+                                    connectionId,
+                                    getClientID(),
+                                    remoteAddress,
+                                    _vhost.getName(), // TODO - virtual host
+                                    0) // TODO - channel)
+            + "] ";
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Fri Aug  3 12:13:32 2012
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.tra
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.subscription.Subscription;
@@ -66,7 +67,7 @@ class Subscription_1_0 implements Subscr
     private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
     private final long _id;
     private final boolean _acquires;
-    private AMQQueue.Context _queueContext;
+    private volatile AMQQueue.Context _queueContext;
     private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
     private ReentrantLock _stateChangeLock = new ReentrantLock();
 
@@ -631,4 +632,46 @@ class Subscription_1_0 implements Subscr
     {
         _filters = filters;
     }
+
+    @Override
+    public AMQSessionModel getSessionModel()
+    {
+        // TODO
+        return getSession();
+    }
+
+    @Override
+    public long getBytesOut()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public long getMessagesOut()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public long getUnacknowledgedBytes()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public long getUnacknowledgedMessages()
+    {
+        // TODO
+        return 0;
+    }
+
+    @Override
+    public String getConsumerName()
+    {
+        //TODO
+        return "TODO";
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java Fri Aug  3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.protocol.v1_0;
 
 import org.apache.qpid.amqp_1_0.type.DeliveryState;

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1333988-1368650

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Aug  3 12:13:32 2012
@@ -28,21 +28,25 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
                                   QueueConfig
 {
+    public interface NotificationListener
+    {
+        void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+    }
+
     boolean getDeleteOnNoConsumers();
 
     void setDeleteOnNoConsumers(boolean b);
@@ -57,6 +61,12 @@ public interface AMQQueue extends Managa
 
     LogSubject getLogSubject();
 
+    long getUnackedMessageBytes();
+
+    long getTotalDequeueCount();
+
+    long getTotalEnqueueCount();
+
     public interface Context
     {
         QueueEntry getLastSeenEntry();
@@ -79,6 +89,17 @@ public interface AMQQueue extends Managa
 
     void unregisterSubscription(final Subscription subscription) throws AMQException;
 
+    Collection<Subscription> getConsumers();
+
+    interface SubscriptionRegistrationListener
+    {
+        void subscriptionRegistered(AMQQueue queue, Subscription subscription);
+        void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
+    }
+
+    void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
+    void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
+
 
     int getConsumerCount();
 
@@ -109,7 +130,7 @@ public interface AMQQueue extends Managa
 
     void dequeue(QueueEntry entry, Subscription sub);
 
-    void decrementUnackedMsgCount();
+    void decrementUnackedMsgCount(QueueEntry queueEntry);
 
     boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
 
@@ -139,20 +160,8 @@ public interface AMQQueue extends Managa
      */
     public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
 
+    void visit(QueueEntryVisitor visitor);
 
-    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
-
-    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
-
-    void removeMessagesFromQueue(long fromMessageId, long toMessageId);
-
-    static interface Visitor
-    {
-        boolean visit(QueueEntry entry);
-    }
-    
-    void visit(Visitor visitor);
-    
 
     long getMaximumMessageSize();
 
@@ -216,8 +225,6 @@ public interface AMQQueue extends Managa
 
     void setAlternateExchange(Exchange exchange);
 
-    void setAlternateExchange(String exchangeName);
-
     Map<String, Object> getArguments();
 
     void checkCapacity(AMQSessionModel channel);
@@ -245,12 +252,12 @@ public interface AMQQueue extends Managa
     }
 
     /**
-     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
      * already exists.
      *
      * <p/><table id="crc"><caption>CRC Card</caption>
      * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+     * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
      * </table>
      *
      * @todo Not an AMQP exception as no status code.
@@ -274,9 +281,7 @@ public interface AMQQueue extends Managa
 
     ConfigurationPlugin getConfiguration();
 
-    ManagedObject getManagedObject();
-
-    void setExclusive(boolean exclusive) throws AMQException;
+    void setExclusive(boolean exclusive);
 
     /**
      * Gets the maximum delivery count.   If a message on this queue
@@ -295,4 +300,19 @@ public interface AMQQueue extends Managa
      */
     public void setMaximumDeliveryCount(final int maximumDeliveryCount);
 
+    void setNotificationListener(NotificationListener listener);
+
+    /**
+     * Sets the free text description of this queue.
+     *
+     * @param description
+     *
+     */
+    void setDescription(String description);
+
+    /**
+     * Gets the free text description of this queue.
+     */
+    String getDescription();
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Aug  3 12:13:32 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -41,6 +42,7 @@ import org.apache.qpid.server.virtualhos
 public class AMQQueueFactory
 {
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
+    public static final String X_QPID_DESCRIPTION = "x-qpid-description";
     public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
     public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
     public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
@@ -169,29 +171,7 @@ public class AMQQueueFactory
     };
 
     /**
-     * Creates a new queue with a random id.
-     *
-     * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map)
-     * @deprecated because only called from unit tests
-     * */
-    @Deprecated
-    public static AMQQueue createAMQQueueImpl(AMQShortString name,
-                                              boolean durable,
-                                              AMQShortString owner,
-                                              boolean autoDelete,
-                                              boolean exclusive,
-                                              VirtualHost virtualHost, final FieldTable arguments) throws AMQException
-    {
-        return createAMQQueueImpl(UUIDGenerator.generateUUID(),
-                                  name == null ? null : name.toString(),
-                                  durable,
-                                  owner == null ? null : owner.toString(),
-                                  autoDelete,
-                                  exclusive, virtualHost, FieldTable.convertToMap(arguments));
-    }
-
-    /**
-     * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value.
+     * @param id the id to use.
      */
     public static AMQQueue createAMQQueueImpl(UUID id,
                                               String queueName,
@@ -300,7 +280,7 @@ public class AMQQueueFactory
 
                 if(dlExchange == null)
                 {
-                    dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+                    dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
 
                     exchangeRegistry.registerExchange(dlExchange);
 
@@ -322,7 +302,7 @@ public class AMQQueueFactory
                     args.put(X_QPID_DLQ_ENABLED, false);
                     args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
 
-                    dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
+                    dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
 
                     //enter the dlq in the persistent store
                     virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
@@ -350,42 +330,16 @@ public class AMQQueueFactory
         boolean autodelete = config.getAutoDelete();
         boolean exclusive = config.getExclusive();
         String owner = config.getOwner();
-        Map<String,Object> arguments = null;
-
-        if(config.isLVQ() || config.getLVQKey() != null)
-        {
-            arguments = new HashMap<String,Object>();
-            arguments.put(QPID_LAST_VALUE_QUEUE, 1);
-            arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
-        }
-        else if (config.getPriority() || config.getPriorities() > 0)
-        {
-            arguments = new HashMap<String,Object>();
-            arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
-        }
-        else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
-        {
-            arguments = new HashMap<String,Object>();
-            arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
-        }
-        if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
-        {
-            if (arguments == null)
-            {
-                arguments = new HashMap<String,Object>();
-            }
-            arguments.put(X_QPID_DLQ_ENABLED, true);
-        }
+        Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
 
         // we need queues that are defined in config to have deterministic ids.
-        UUID id = UUIDGenerator.generateUUID(queueName, host.getName());
+        UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName());
 
         AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
         q.configure(config);
         return q;
     }
 
-
     /**
      * Validates DLQ and DLE names
      * <p>
@@ -475,4 +429,43 @@ public class AMQQueueFactory
         String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
         return dlExchangeName;
     }
+
+    private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
+    {
+        Map<String,Object> arguments = new HashMap<String,Object>();
+
+        if(config.isLVQ() || config.getLVQKey() != null)
+        {
+            arguments.put(QPID_LAST_VALUE_QUEUE, 1);
+            arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+        }
+        else if (config.getPriority() || config.getPriorities() > 0)
+        {
+            arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+        }
+        else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+        {
+            arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+        }
+
+        if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
+        {
+            arguments.put(X_QPID_DLQ_ENABLED, true);
+        }
+
+        if (config.getDescription() != null && !"".equals(config.getDescription()))
+        {
+            arguments.put(X_QPID_DESCRIPTION, config.getDescription());
+        }
+
+        if (arguments.isEmpty())
+        {
+            return Collections.emptyMap();
+        }
+        else
+        {
+            return arguments;
+        }
+    }
+
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Fri Aug  3 12:13:32 2012
@@ -24,17 +24,25 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class ConflationQueueList extends SimpleQueueEntryList
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
 
     private final String _conflationKey;
     private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
         new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
 
+    private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
+    private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+
     public ConflationQueueList(AMQQueue queue, String conflationKey)
     {
         super(queue);
@@ -52,48 +60,98 @@ public class ConflationQueueList extends
         return new ConflationQueueEntry(this, message);
     }
 
-
+    /**
+     * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
+     */
     @Override
     public ConflationQueueEntry add(final ServerMessage message)
     {
-        ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
-        AtomicReference<QueueEntry> latestValueReference = null;
+        final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
 
-        Object value = message.getMessageHeader().getHeader(_conflationKey);
-        if(value != null)
+        final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
+        if (keyValue != null)
         {
-            latestValueReference = _latestValuesMap.get(value);
-            if(latestValueReference == null)
+            if(LOGGER.isDebugEnabled())
             {
-                _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
-                latestValueReference = _latestValuesMap.get(value);
+                LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
             }
-            QueueEntry oldEntry;
 
+            final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
+            AtomicReference<QueueEntry> entryReferenceFromMap = null;
+            QueueEntry entryFromMap;
+
+            // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
+            // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
+            // indicating that the reference object is no longer valid (it is being removed from the map).
+            boolean keepTryingToUpdateEntryReference = true;
             do
             {
-                oldEntry = latestValueReference.get();
+                do
+                {
+                    entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
+
+                    // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)  
+                    entryFromMap = entryReferenceFromMap.get();
+                }
+                while(entryFromMap == _deleteInProgress);
+
+                boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
+
+                keepTryingToUpdateEntryReference = entryFromMapIsOlder
+                        && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
             }
-            while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+            while(keepTryingToUpdateEntryReference);
 
-            if(oldEntry.compareTo(entry) < 0)
+            if (entryFromMap == _newerEntryAlreadyBeenAndGone)
+            {
+                discardEntry(addedEntry);
+            }
+            else if (entryFromMap.compareTo(addedEntry) > 0)
             {
-                // We replaced some other entry to become the newest value
-                if(oldEntry.acquire())
+                if(LOGGER.isDebugEnabled())
                 {
-                    discardEntry(oldEntry);
+                    LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
                 }
+                discardEntry(addedEntry);
             }
-            else if (oldEntry.compareTo(entry) > 0)
+            else if (entryFromMap.compareTo(addedEntry) < 0)
             {
-                // A newer entry came along
-                discardEntry(entry);
-
+                if(LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
+                }
+                discardEntry(entryFromMap);
             }
+
+            addedEntry.setLatestValueReference(entryReferenceFromMap);
         }
 
-        entry.setLatestValueReference(latestValueReference);
-        return entry;
+        return addedEntry;
+    }
+
+    /**
+     * Returns:
+     *
+     * <ul>
+     * <li>the existing entry reference if the value already exists in the map, or</li>
+     * <li>referenceToValue if none exists, or</li>
+     * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
+     * adds and removes during execution of this method.</li>
+     * </ul>
+     */
+    private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+    {
+        AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+
+        if(latestValueReference == null)
+        {
+            latestValueReference = _latestValuesMap.get(key);
+            if(latestValueReference == null)
+            {
+                return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+            }
+        }
+        return latestValueReference;
     }
 
     private void discardEntry(final QueueEntry entry)
@@ -104,11 +162,13 @@ public class ConflationQueueList extends
             txn.dequeue(entry.getQueue(),entry.getMessage(),
                                     new ServerTransaction.Action()
                                 {
+                                    @Override
                                     public void postCommit()
                                     {
                                         entry.discard();
                                     }
 
+                                    @Override
                                     public void onRollback()
                                     {
 
@@ -120,7 +180,6 @@ public class ConflationQueueList extends
     private final class ConflationQueueEntry extends SimpleQueueEntryImpl
     {
 
-
         private AtomicReference<QueueEntry> _latestValueReference;
 
         public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
@@ -128,25 +187,56 @@ public class ConflationQueueList extends
             super(queueEntryList, message);
         }
 
-
+        @Override
         public void release()
         {
             super.release();
 
-            if(_latestValueReference != null)
+            discardIfReleasedEntryIsNoLongerLatest();
+        }
+
+        @Override
+        public boolean delete()
+        {
+            if(super.delete())
             {
-                if(_latestValueReference.get() != this)
+                if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
                 {
-                    discardEntry(this);
+                    Object key = getMessageHeader().getHeader(_conflationKey);
+                    _latestValuesMap.remove(key,_latestValueReference);
                 }
+                return true;
+            }
+            else
+            {
+                return false;
             }
-
         }
 
         public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
         {
             _latestValueReference = latestValueReference;
         }
+
+        private void discardIfReleasedEntryIsNoLongerLatest()
+        {
+            if(_latestValueReference != null)
+            {
+                if(_latestValueReference.get() != this)
+                {
+                    discardEntry(this);
+                }
+            }
+        }
+
+    }
+
+    /**
+     * Exposed purposes of unit test only.
+     */
+    Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+    {
+        return Collections.unmodifiableMap(_latestValuesMap);
     }
 
     static class Factory implements QueueEntryListFactory
@@ -163,5 +253,4 @@ public class ConflationQueueList extends
             return new ConflationQueueList(queue, _conflationKey);
         }
     }
-
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Aug  3 12:13:32 2012
@@ -20,12 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,11 +31,11 @@ import java.util.concurrent.ConcurrentMa
 
 public class DefaultQueueRegistry implements QueueRegistry
 {
-    private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
-
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final VirtualHost _virtualHost;
+    private final Collection<RegistryChangeListener> _listeners =
+            new ArrayList<RegistryChangeListener>();
 
     public DefaultQueueRegistry(VirtualHost virtualHost)
     {
@@ -52,11 +50,28 @@ public class DefaultQueueRegistry implem
     public void registerQueue(AMQQueue queue)
     {
         _queueMap.put(queue.getNameShortString(), queue);
+        synchronized (_listeners)
+        {
+            for(RegistryChangeListener listener : _listeners)
+            {
+                listener.queueRegistered(queue);
+            }
+        }
     }
 
     public void unregisterQueue(AMQShortString name)
     {
-        _queueMap.remove(name);
+        AMQQueue q = _queueMap.remove(name);
+        if(q != null)
+        {
+            synchronized (_listeners)
+            {
+                for(RegistryChangeListener listener : _listeners)
+                {
+                    listener.queueUnregistered(q);
+                }
+            }
+        }
     }
 
     public AMQQueue getQueue(AMQShortString name)
@@ -79,19 +94,30 @@ public class DefaultQueueRegistry implem
         return getQueue(new AMQShortString(queue));
     }
 
+    public void addRegistryChangeListener(RegistryChangeListener listener)
+    {
+        synchronized(_listeners)
+        {
+            _listeners.add(listener);
+        }
+    }
+
     @Override
     public void stopAllAndUnregisterMBeans()
     {
         for (final AMQQueue queue : getQueues())
         {
             queue.stop();
-            try
-            {
-                queue.getManagedObject().unregister();
-            }
-            catch (AMQException e)
+
+            //TODO: this is a bit of a hack, what if the listeners aren't aware
+            //that we are just unregistering the MBean because of HA, and aren't
+            //actually removing the queue as such.
+            synchronized (_listeners)
             {
-                LOGGER.warn("Failed to unregister mbean", e);
+                for(RegistryChangeListener listener : _listeners)
+                {
+                    listener.queueUnregistered(queue);
+                }
             }
         }
         _queueMap.clear();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org