You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/02/04 17:15:28 UTC

svn commit: r1067210 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/...

Author: robbie
Date: Fri Feb  4 16:15:27 2011
New Revision: 1067210

URL: http://svn.apache.org/viewvc?rev=1067210&view=rev
Log:
QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N.

Added:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Feb  4 16:15:27 2011
@@ -108,6 +108,7 @@ public class ServerConfiguration extends
         envVarMap.put("QPID_MAXIMUMMESSAGECOUNT", "maximumMessageCount");
         envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
         envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
+        envVarMap.put("QPID_MAXIMUMCHANNELCOUNT", "maximumChannelCount");
         envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
         envVarMap.put("QPID_QUEUECAPACITY", "capacity");
         envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
@@ -818,4 +819,9 @@ public class ServerConfiguration extends
             }
         };
     }
+
+    public int getMaxChannelCount()
+    {
+        return getIntValue("maximumChannelCount", 256);
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Feb  4 16:15:27 2011
@@ -92,7 +92,7 @@ public class ConnectionSecureOkMethodHan
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
                 ConnectionTuneBody tuneBody =
-                        methodRegistry.createConnectionTuneBody(0xFFFF,
+                        methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
                                                                 ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
                                                                 ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
                 session.writeFrame(tuneBody.generateFrame(0));

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Feb  4 16:15:27 2011
@@ -113,7 +113,7 @@ public class ConnectionStartOkMethodHand
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
-                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+                    ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
                                                                                           getConfiguredFrameSize(),
                                                                                           ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
                     session.writeFrame(tuneBody.generateFrame(0));

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Feb  4 16:15:27 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.server.handler;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -51,5 +50,9 @@ public class ConnectionTuneOkMethodHandl
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
         session.initHeartbeats(body.getHeartbeat());
         session.setMaxFrameSize(body.getFrameMax());
+        
+        long maxChannelNumber = body.getChannelMax();
+        //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+        session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Feb  4 16:15:27 2011
@@ -136,7 +136,7 @@ public class AMQProtocolEngine implement
     protected volatile boolean _closed;
     
     // maximum number of channels this session should have
-    private long _maxNoOfChannels = 1000;
+    private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
 
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Feb  4 16:15:27 2011
@@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
@@ -145,4 +146,10 @@ public class ServerConnectionDelegate ex
         //TODO: implement broker support for actually sending heartbeats
         return 0;
     }
+
+    @Override
+    protected int getChannelMax()
+    {
+        return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Feb  4 16:15:27 2011
@@ -26,15 +26,12 @@ import java.net.ConnectException;
 import java.net.UnknownHostException;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
-    public static final class ChannelToSessionMap
-    {
-        private final AMQSession[] _fastAccessSessions = new AMQSession[16];
-        private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
-        private int _size = 0;
-        private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
-        private AtomicInteger _idFactory = new AtomicInteger(0);
-        private int _maxChannelID;
-        private boolean _cycledIds;
-
-        public AMQSession get(int channelId)
-        {
-            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
-            {
-                return _fastAccessSessions[channelId];
-            }
-            else
-            {
-                return _slowAccessSessions.get(channelId);
-            }
-        }
-
-        public AMQSession put(int channelId, AMQSession session)
-        {
-            AMQSession oldVal;
-            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
-            {
-                oldVal = _fastAccessSessions[channelId];
-                _fastAccessSessions[channelId] = session;
-            }
-            else
-            {
-                oldVal = _slowAccessSessions.put(channelId, session);
-            }
-            if ((oldVal != null) && (session == null))
-            {
-                _size--;
-            }
-            else if ((oldVal == null) && (session != null))
-            {
-                _size++;
-            }
-
-            return session;
-
-        }
-
-        public AMQSession remove(int channelId)
-        {
-            AMQSession session;
-            if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
-            {
-                session = _fastAccessSessions[channelId];
-                _fastAccessSessions[channelId] = null;
-            }
-            else
-            {
-                session = _slowAccessSessions.remove(channelId);
-            }
-
-            if (session != null)
-            {
-                _size--;
-            }
-            return session;
-
-        }
-
-        public Collection<AMQSession> values()
-        {
-            ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
-
-            for (int i = 0; i < 16; i++)
-            {
-                if (_fastAccessSessions[i] != null)
-                {
-                    values.add(_fastAccessSessions[i]);
-                }
-            }
-            values.addAll(_slowAccessSessions.values());
-
-            return values;
-        }
-
-        public int size()
-        {
-            return _size;
-        }
-
-        public void clear()
-        {
-            _size = 0;
-            _slowAccessSessions.clear();
-            for (int i = 0; i < 16; i++)
-            {
-                _fastAccessSessions[i] = null;
-            }
-        }
-
-        /*
-         * Synchronized on whole method so that we don't need to consider the
-         * increment-then-reset path in too much detail
-         */
-        public synchronized int getNextChannelId()
-        {
-            int id = 0;
-            if (!_cycledIds)
-            {
-                id = _idFactory.incrementAndGet();
-                if (id == _maxChannelID)
-                {
-                    _cycledIds = true;
-                    _idFactory.set(0); // Go back to the start
-                }
-            }
-            else
-            {
-                boolean done = false;
-                while (!done)
-                {
-                    // Needs to work second time through
-                    id = _idFactory.incrementAndGet();
-                    if (id > _maxChannelID)
-                    {
-                        _idFactory.set(0);
-                        id = _idFactory.incrementAndGet();
-                    }
-                    if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
-                    {
-                        done = (_fastAccessSessions[id] == null);
-                    }
-                    else
-                    {
-                        done = (!_slowAccessSessions.keySet().contains(id));
-                    }
-                }
-            }
-
-            return id;
-        }
-
-        public void setMaxChannelID(int maxChannelID)
-        {
-            _maxChannelID = maxChannelID;
-        }
-    }
-
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
 
@@ -244,9 +94,9 @@ public class AMQConnection extends Close
 
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
-     * and we must prevent the client from opening too many. Zero means unlimited.
+     * and we must prevent the client from opening too many.
      */
-    protected long _maximumChannelCount;
+    private long _maximumChannelCount;
 
     /** The maximum size of frame supported by the server */
     private long _maximumFrameSize;
@@ -489,7 +339,6 @@ public class AMQConnection extends Close
         {
             _delegate = new AMQConnectionDelegate_0_10(this);
         }
-        _sessions.setMaxChannelID(_delegate.getMaxChannelID());
 
         if (_logger.isInfoEnabled())
         {
@@ -570,8 +419,6 @@ public class AMQConnection extends Close
             }
         }
 
-        _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Are we connected:" + _connected);
@@ -579,6 +426,11 @@ public class AMQConnection extends Close
 
         if (!_connected)
         {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+            }
+
             String message = null;
 
             if (connectionException != null)
@@ -620,6 +472,11 @@ public class AMQConnection extends Close
             throw new AMQConnectionFailureException(message, connectionException);
         }
 
+        _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+
+        _sessions.setMaxChannelID(_delegate.getMaxChannelID());
+        _sessions.setMinChannelID(_delegate.getMinChannelID());
+
         _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
@@ -647,7 +504,6 @@ public class AMQConnection extends Close
             Class partypes[] = new Class[1];
             partypes[0] = AMQConnection.class;
             _delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
-            _sessions.setMaxChannelID(_delegate.getMaxChannelID());
             //Update our session to use this new protocol version 
             _protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
 
@@ -898,7 +754,7 @@ public class AMQConnection extends Close
 
     public boolean channelLimitReached()
     {
-        return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
+        return _sessions.size() >= _maximumChannelCount;
     }
 
     public String getClientID() throws JMSException

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Feb  4 16:15:27 2011
@@ -59,6 +59,8 @@ public interface AMQConnectionDelegate
     <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
     
     int getMaxChannelID();
-    
+
+    int getMinChannelID();
+
     ProtocolVersion getProtocolVersion();
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Feb  4 16:15:27 2011
@@ -37,6 +37,7 @@ import org.apache.qpid.client.failover.F
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.Connection;
@@ -82,6 +83,12 @@ public class AMQConnectionDelegate_0_10 
             throws JMSException
     {
         _conn.checkNotClosed();
+
+        if (_conn.channelLimitReached())
+        {
+            throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+        }
+
         int channelId = _conn.getNextChannelID();
         AMQSession session;
         try
@@ -120,6 +127,12 @@ public class AMQConnectionDelegate_0_10 
     public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
     {
         _conn.checkNotClosed();
+
+        if (_conn.channelLimitReached())
+        {
+            throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+        }
+
         int channelId = _conn.getNextChannelID();
         XASessionImpl session;
         try
@@ -165,6 +178,7 @@ public class AMQConnectionDelegate_0_10 
 
             _conn._connected = true;
             _conn.setUsername(_qpidConnection.getUserID());
+            _conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
             _conn._failoverPolicy.attainedConnection();
         }
         catch (ProtocolVersionException pe)
@@ -293,7 +307,13 @@ public class AMQConnectionDelegate_0_10 
 
     public int getMaxChannelID()
     {
-       return Integer.MAX_VALUE;
+        //For a negotiated channelMax N, there are channels 0 to N-1 available.
+        return _qpidConnection.getChannelMax() - 1;
+    }
+
+    public int getMinChannelID()
+    {
+        return Connection.MIN_USABLE_CHANNEL_NUM;
     }
 
     public ProtocolVersion getProtocolVersion()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Feb  4 16:15:27 2011
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.transport.TransportConnection;
@@ -134,7 +135,7 @@ public class AMQConnectionDelegate_8_0 i
 
         if (_conn.channelLimitReached())
         {
-            throw new ChannelLimitReachedException(_conn._maximumChannelCount);
+            throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
         }
 
         return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
@@ -307,7 +308,14 @@ public class AMQConnectionDelegate_8_0 i
 
     public int getMaxChannelID()
     {
-        return (int) (Math.pow(2, 16)-1);
+        ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters();
+
+        return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax();
+    }
+
+    public int getMinChannelID()
+    {
+        return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM;
     }
 
     public ProtocolVersion getProtocolVersion()

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java?rev=1067210&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java Fri Feb  4 16:15:27 2011
@@ -0,0 +1,147 @@
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ChannelToSessionMap
+{
+    private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+    private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+    private int _size = 0;
+    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+    private AtomicInteger _idFactory = new AtomicInteger(0);
+    private int _maxChannelID;
+    private int _minChannelID;
+
+    public AMQSession get(int channelId)
+    {
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        {
+            return _fastAccessSessions[channelId];
+        }
+        else
+        {
+            return _slowAccessSessions.get(channelId);
+        }
+    }
+
+    public AMQSession put(int channelId, AMQSession session)
+    {
+        AMQSession oldVal;
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        {
+            oldVal = _fastAccessSessions[channelId];
+            _fastAccessSessions[channelId] = session;
+        }
+        else
+        {
+            oldVal = _slowAccessSessions.put(channelId, session);
+        }
+        if ((oldVal != null) && (session == null))
+        {
+            _size--;
+        }
+        else if ((oldVal == null) && (session != null))
+        {
+            _size++;
+        }
+
+        return session;
+
+    }
+
+    public AMQSession remove(int channelId)
+    {
+        AMQSession session;
+        if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        {
+            session = _fastAccessSessions[channelId];
+            _fastAccessSessions[channelId] = null;
+        }
+        else
+        {
+            session = _slowAccessSessions.remove(channelId);
+        }
+
+        if (session != null)
+        {
+            _size--;
+        }
+        return session;
+
+    }
+
+    public Collection<AMQSession> values()
+    {
+        ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+        for (int i = 0; i < 16; i++)
+        {
+            if (_fastAccessSessions[i] != null)
+            {
+                values.add(_fastAccessSessions[i]);
+            }
+        }
+        values.addAll(_slowAccessSessions.values());
+
+        return values;
+    }
+
+    public int size()
+    {
+        return _size;
+    }
+
+    public void clear()
+    {
+        _size = 0;
+        _slowAccessSessions.clear();
+        for (int i = 0; i < 16; i++)
+        {
+            _fastAccessSessions[i] = null;
+        }
+    }
+
+    /*
+     * Synchronized on whole method so that we don't need to consider the
+     * increment-then-reset path in too much detail
+     */
+    public synchronized int getNextChannelId()
+    {
+        int id = _minChannelID;
+
+        boolean done = false;
+        while (!done)
+        {
+            id = _idFactory.getAndIncrement();
+            if (id == _maxChannelID)
+            {
+                //go back to the start
+                _idFactory.set(_minChannelID);
+            }
+            if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
+            {
+                done = (_fastAccessSessions[id] == null);
+            }
+            else
+            {
+                done = (!_slowAccessSessions.keySet().contains(id));
+            }
+        }
+
+        return id;
+    }
+
+    public void setMaxChannelID(int maxChannelID)
+    {
+        _maxChannelID = maxChannelID;
+    }
+
+    public void setMinChannelID(int minChannelID)
+    {
+        _minChannelID = minChannelID;
+        _idFactory.set(_minChannelID);
+    }
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Feb  4 16:15:27 2011
@@ -55,9 +55,12 @@ public class ConnectionTuneMethodHandler
         {
             params = new ConnectionTuneParameters();
         }
+        
+        int maxChannelNumber = frame.getChannelMax();
+        //0 implies no limit, except that forced by protocol limitations (0xFFFF)
+        params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber);
 
         params.setFrameMax(frame.getFrameMax());
-        params.setChannelMax(frame.getChannelMax());
         params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
         session.setConnectionTuneParameters(params);
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Feb  4 16:15:27 2011
@@ -54,6 +54,10 @@ public class AMQProtocolSession implemen
 
     public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
 
+    //Usable channels are numbered 1 to <ChannelMax>
+    public static final int MAX_CHANNEL_MAX = 0xFFFF;
+    public static final int MIN_USABLE_CHANNEL_NUM = 1;
+
     protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
 
     protected static final String AMQ_CONNECTION = "AMQConnection";
@@ -178,6 +182,7 @@ public class AMQProtocolSession implemen
     {
         _connectionTuneParameters = params;
         AMQConnection con = getAMQConnection();
+
         con.setMaximumChannelCount(params.getChannelMax());
         con.setMaximumFrameSize(params.getFrameMax());
         _protocolHandler.initHeartbeats((int) params.getHeartbeat());

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java Fri Feb  4 16:15:27 2011
@@ -33,9 +33,9 @@ public class ChannelLimitReachedExceptio
 
     public ChannelLimitReachedException(long limit)
     {
-        super("Unable to create session since maximum number of sessions per connection is " +
-              limit + ". Either close one or more sessions or increase the " +
-              "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE);
+        super("Unable to create session, the maximum number of sessions per connection is " +
+              limit + ". You must either close one or more sessions or increase the " +
+              "maximum number of sessions available per connection.", ERROR_CODE);
         _limit = limit;
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Feb  4 16:15:27 2011
@@ -86,7 +86,8 @@ public class ClientDelegate extends Conn
         }
     }
 
-    @Override public void connectionStart(Connection conn, ConnectionStart start)
+    @Override
+    public void connectionStart(Connection conn, ConnectionStart start)
     {
         Map<String,Object> clientProperties = new HashMap<String,Object>();
 
@@ -156,7 +157,8 @@ public class ClientDelegate extends Conn
         }
     }
 
-    @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
+    @Override
+    public void connectionSecure(Connection conn, ConnectionSecure secure)
     {
         SaslClient sc = conn.getSaslClient();
         try
@@ -170,9 +172,9 @@ public class ClientDelegate extends Conn
         }
     }
 
-    @Override public void connectionTune(Connection conn, ConnectionTune tune)
+    @Override
+    public void connectionTune(Connection conn, ConnectionTune tune)
     {
-        conn.setChannelMax(tune.getChannelMax());
         int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
                                                      tune.getHeartbeatMin(),
                                                      tune.getHeartbeatMax()
@@ -182,10 +184,17 @@ public class ClientDelegate extends Conn
                               hb_interval);
         // The idle timeout is twice the heartbeat amount (in milisecs)
         conn.setIdleTimeout(hb_interval*1000*2);
+
+        int channelMax = tune.getChannelMax();
+        //0 means no implied limit, except available server resources
+        //(or that forced by protocol limitations [0xFFFF])
+        conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
         conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
     }
 
-    @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
+    @Override
+    public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
     {
         SaslClient sc = conn.getSaslClient();
         if (sc != null)
@@ -210,12 +219,14 @@ public class ClientDelegate extends Conn
         conn.setState(OPEN);
     }
 
-    @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
+    @Override
+    public void connectionRedirect(Connection conn, ConnectionRedirect redir)
     {
         throw new UnsupportedOperationException();
     }
 
-    @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+    @Override
+    public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
     {
         conn.connectionHeartbeat();
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Feb  4 16:15:27 2011
@@ -59,6 +59,9 @@ public class Connection extends Connecti
 
     protected static final Logger log = Logger.get(Connection.class);
 
+    //Usable channels are numbered 0 to <ChannelMax> - 1
+    public static final int MAX_CHANNEL_MAX = 0xFFFF;
+    public static final int MIN_USABLE_CHANNEL_NUM = 0;
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
 
@@ -404,7 +407,8 @@ public class Connection extends Connecti
     {
         synchronized (lock)
         {
-            for (int i = 1; i <= getChannelMax(); i++)
+            //For a negotiated channelMax N, there are channels 0 to N-1 available.
+            for (int i = 0; i < getChannelMax(); i++)
             {
                 if (!channels.containsKey(i))
                 {

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Fri Feb  4 16:15:27 2011
@@ -30,6 +30,8 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * ServerDelegate
@@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer;
 
 public class ServerDelegate extends ConnectionDelegate
 {
+    protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
 
-    private SaslServer saslServer;
     private List<Object> _locales;
     private List<Object> _mechanisms;
     private Map<String, Object> _clientProperties;
@@ -47,7 +49,7 @@ public class ServerDelegate extends Conn
 
     public ServerDelegate()
     {
-        this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+        this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8"));
     }
 
     protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
@@ -64,7 +66,8 @@ public class ServerDelegate extends Conn
         conn.connectionStart(_clientProperties, _mechanisms, _locales);
     }
 
-    @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+    @Override
+    public void connectionStartOk(Connection conn, ConnectionStartOk ok)
     {
         conn.setLocale(ok.getLocale());
         String mechanism = ok.getMechanism();
@@ -75,9 +78,9 @@ public class ServerDelegate extends Conn
         if (mechanism == null || mechanism.length() == 0)
         {
             conn.connectionTune
-                (Integer.MAX_VALUE,
+                (getChannelMax(),
                  org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
-                 0, Integer.MAX_VALUE);
+                 0, getHeartbeatMax());
             return;
         }
 
@@ -118,7 +121,7 @@ public class ServerDelegate extends Conn
             {
                 ss.dispose();
                 conn.connectionTune
-                    (Integer.MAX_VALUE,
+                    (getChannelMax(),
                      org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
                      0, getHeartbeatMax());
                 conn.setAuthorizationID(ss.getAuthorizationID());
@@ -140,19 +143,42 @@ public class ServerDelegate extends Conn
         return Integer.MAX_VALUE;
     }
 
-    @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
+    protected int getChannelMax()
+    {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
     {
         secure(conn, ok.getResponse());
     }
 
-    @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
+    @Override
+    public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
     {
+        int okChannelMax = ok.getChannelMax();
         
+        if (okChannelMax > getChannelMax())
+        {
+            _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
+                    "client connectionTuneOk returned a channelMax (" + okChannelMax +
+                    ") above the servers offered limit (" + getChannelMax() +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            conn.getSender().close();
+            return;
+        }
+
+        //0 means no implied limit, except available server resources
+        //(or that forced by protocol limitations [0xFFFF])
+        conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
     }
 
-    @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+    @Override
+    public void connectionOpen(Connection conn, ConnectionOpen open)
     {
-        conn.connectionOpenOk(Collections.EMPTY_LIST);
+        conn.connectionOpenOk(Collections.emptyList());
 
         conn.setState(OPEN);
     }
@@ -168,7 +194,8 @@ public class ServerDelegate extends Conn
         return new Session(conn, new Binary(atc.getName()), 0);
     }
 
-    @Override public void sessionAttach(Connection conn, SessionAttach atc)
+    @Override
+    public void sessionAttach(Connection conn, SessionAttach atc)
     {
         Session ssn = getSession(conn, atc);
         conn.map(ssn, atc.getChannel());

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java Fri Feb  4 16:15:27 2011
@@ -75,7 +75,7 @@ public class ChannelLoggingTest extends 
         String log = getLogMessage(results, 0);
         //  MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create
         validateMessageID("CHN-1001", log);
-        assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log)));
+        assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log)));
 
         if (isBroker08())
         {
@@ -89,7 +89,7 @@ public class ChannelLoggingTest extends 
             log = getLogMessage(results, 0);
             //  MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
             validateMessageID("CHN-1004", log);
-            assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log)));
+            assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log)));
             assertTrue("Prefetch Count not correct",getMessageString(fromMessage(log)).endsWith("Count "+PREFETCH));
         }
 
@@ -306,7 +306,7 @@ public class ChannelLoggingTest extends 
         validateMessageID("CHN-1001", open);
         validateMessageID("CHN-1003", close);
         assertEquals("Message should be Close", "Close", getMessageString(fromMessage(close)));
-        assertEquals("Incorrect Channel ID closed", 1, getChannelID(fromSubject(close)));
+        assertEquals("Incorrect Channel ID closed", isBroker010()? 0 : 1, getChannelID(fromSubject(close)));
         assertEquals("Channel IDs should be the same", getChannelID(fromActor(open)), getChannelID(fromSubject(close)));
         assertEquals("Connection IDs should be the same", getConnectionID(fromActor(open)), getConnectionID(fromSubject(close)));
     }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Fri Feb  4 16:15:27 2011
@@ -266,19 +266,22 @@ public class AMQConnectionTest extends Q
         }
     }
     
-    public void testGetChannelID()
+    public void testGetChannelID() throws Exception
     {
-        int maxChannelID = 65536;
+        long maxChannelID = _connection.getMaximumChannelCount();
         if (isBroker010())
         {
-            maxChannelID = Integer.MAX_VALUE+1;
+            //Usable numbers are 0 to N-1 when using 0-10
+            //and 1 to N for 0-8/0-9
+            maxChannelID = maxChannelID-1;
         }
         for (int j = 0; j < 3; j++)
         {
-            for (int i = 1; i < maxChannelID; i++)
+            int i = isBroker010() ? 0 : 1;
+            for ( ; i <= maxChannelID; i++)
             {
                 int id = _connection.getNextChannelID();
-                assertEquals("On iterartion "+j, i, id);
+                assertEquals("Unexpected number on iteration "+j, i, id);
                 _connection.deregisterSession(id);
             }
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org