You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/02/04 17:15:28 UTC
svn commit: r1067210 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/...
Author: robbie
Date: Fri Feb 4 16:15:27 2011
New Revision: 1067210
URL: http://svn.apache.org/viewvc?rev=1067210&view=rev
Log:
QPID-3029: actually set and negotiate the supported max num channels per connection during connection handshake. Enable/make the 0-10 client use channel numbers 0 to N-1 in line with the spec, rather than 1-N.
Added:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Feb 4 16:15:27 2011
@@ -108,6 +108,7 @@ public class ServerConfiguration extends
envVarMap.put("QPID_MAXIMUMMESSAGECOUNT", "maximumMessageCount");
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
+ envVarMap.put("QPID_MAXIMUMCHANNELCOUNT", "maximumChannelCount");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
envVarMap.put("QPID_QUEUECAPACITY", "capacity");
envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
@@ -818,4 +819,9 @@ public class ServerConfiguration extends
}
};
}
+
+ public int getMaxChannelCount()
+ {
+ return getIntValue("maximumChannelCount", 256);
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Fri Feb 4 16:15:27 2011
@@ -92,7 +92,7 @@ public class ConnectionSecureOkMethodHan
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
ConnectionTuneBody tuneBody =
- methodRegistry.createConnectionTuneBody(0xFFFF,
+ methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Fri Feb 4 16:15:27 2011
@@ -113,7 +113,7 @@ public class ConnectionStartOkMethodHand
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount(),
getConfiguredFrameSize(),
ApplicationRegistry.getInstance().getConfiguration().getHeartBeatDelay());
session.writeFrame(tuneBody.generateFrame(0));
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Fri Feb 4 16:15:27 2011
@@ -23,7 +23,6 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
@@ -51,5 +50,9 @@ public class ConnectionTuneOkMethodHandl
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
session.initHeartbeats(body.getHeartbeat());
session.setMaxFrameSize(body.getFrameMax());
+
+ long maxChannelNumber = body.getChannelMax();
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Feb 4 16:15:27 2011
@@ -136,7 +136,7 @@ public class AMQProtocolEngine implement
protected volatile boolean _closed;
// maximum number of channels this session should have
- private long _maxNoOfChannels = 1000;
+ private long _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
/* AMQP Version for this session */
private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Feb 4 16:15:27 2011
@@ -26,6 +26,7 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -145,4 +146,10 @@ public class ServerConnectionDelegate ex
//TODO: implement broker support for actually sending heartbeats
return 0;
}
+
+ @Override
+ protected int getChannelMax()
+ {
+ return ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Feb 4 16:15:27 2011
@@ -26,15 +26,12 @@ import java.net.ConnectException;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -84,153 +81,6 @@ import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
- public static final class ChannelToSessionMap
- {
- private final AMQSession[] _fastAccessSessions = new AMQSession[16];
- private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
- private int _size = 0;
- private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
- private AtomicInteger _idFactory = new AtomicInteger(0);
- private int _maxChannelID;
- private boolean _cycledIds;
-
- public AMQSession get(int channelId)
- {
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- return _fastAccessSessions[channelId];
- }
- else
- {
- return _slowAccessSessions.get(channelId);
- }
- }
-
- public AMQSession put(int channelId, AMQSession session)
- {
- AMQSession oldVal;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- oldVal = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = session;
- }
- else
- {
- oldVal = _slowAccessSessions.put(channelId, session);
- }
- if ((oldVal != null) && (session == null))
- {
- _size--;
- }
- else if ((oldVal == null) && (session != null))
- {
- _size++;
- }
-
- return session;
-
- }
-
- public AMQSession remove(int channelId)
- {
- AMQSession session;
- if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- session = _fastAccessSessions[channelId];
- _fastAccessSessions[channelId] = null;
- }
- else
- {
- session = _slowAccessSessions.remove(channelId);
- }
-
- if (session != null)
- {
- _size--;
- }
- return session;
-
- }
-
- public Collection<AMQSession> values()
- {
- ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
-
- for (int i = 0; i < 16; i++)
- {
- if (_fastAccessSessions[i] != null)
- {
- values.add(_fastAccessSessions[i]);
- }
- }
- values.addAll(_slowAccessSessions.values());
-
- return values;
- }
-
- public int size()
- {
- return _size;
- }
-
- public void clear()
- {
- _size = 0;
- _slowAccessSessions.clear();
- for (int i = 0; i < 16; i++)
- {
- _fastAccessSessions[i] = null;
- }
- }
-
- /*
- * Synchronized on whole method so that we don't need to consider the
- * increment-then-reset path in too much detail
- */
- public synchronized int getNextChannelId()
- {
- int id = 0;
- if (!_cycledIds)
- {
- id = _idFactory.incrementAndGet();
- if (id == _maxChannelID)
- {
- _cycledIds = true;
- _idFactory.set(0); // Go back to the start
- }
- }
- else
- {
- boolean done = false;
- while (!done)
- {
- // Needs to work second time through
- id = _idFactory.incrementAndGet();
- if (id > _maxChannelID)
- {
- _idFactory.set(0);
- id = _idFactory.incrementAndGet();
- }
- if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
- {
- done = (_fastAccessSessions[id] == null);
- }
- else
- {
- done = (!_slowAccessSessions.keySet().contains(id));
- }
- }
- }
-
- return id;
- }
-
- public void setMaxChannelID(int maxChannelID)
- {
- _maxChannelID = maxChannelID;
- }
- }
-
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
@@ -244,9 +94,9 @@ public class AMQConnection extends Close
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
- * and we must prevent the client from opening too many. Zero means unlimited.
+ * and we must prevent the client from opening too many.
*/
- protected long _maximumChannelCount;
+ private long _maximumChannelCount;
/** The maximum size of frame supported by the server */
private long _maximumFrameSize;
@@ -489,7 +339,6 @@ public class AMQConnection extends Close
{
_delegate = new AMQConnectionDelegate_0_10(this);
}
- _sessions.setMaxChannelID(_delegate.getMaxChannelID());
if (_logger.isInfoEnabled())
{
@@ -570,8 +419,6 @@ public class AMQConnection extends Close
}
}
- _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
-
if (_logger.isDebugEnabled())
{
_logger.debug("Are we connected:" + _connected);
@@ -579,6 +426,11 @@ public class AMQConnection extends Close
if (!_connected)
{
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Last attempted ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+ }
+
String message = null;
if (connectionException != null)
@@ -620,6 +472,11 @@ public class AMQConnection extends Close
throw new AMQConnectionFailureException(message, connectionException);
}
+ _logger.info("Connected with ProtocolHandler Version:"+_protocolHandler.getProtocolVersion());
+
+ _sessions.setMaxChannelID(_delegate.getMaxChannelID());
+ _sessions.setMinChannelID(_delegate.getMinChannelID());
+
_connectionMetaData = new QpidConnectionMetaData(this);
}
@@ -647,7 +504,6 @@ public class AMQConnection extends Close
Class partypes[] = new Class[1];
partypes[0] = AMQConnection.class;
_delegate = (AMQConnectionDelegate) c.getConstructor(partypes).newInstance(this);
- _sessions.setMaxChannelID(_delegate.getMaxChannelID());
//Update our session to use this new protocol version
_protocolHandler.getProtocolSession().setProtocolVersion(_delegate.getProtocolVersion());
@@ -898,7 +754,7 @@ public class AMQConnection extends Close
public boolean channelLimitReached()
{
- return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
+ return _sessions.size() >= _maximumChannelCount;
}
public String getClientID() throws JMSException
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Fri Feb 4 16:15:27 2011
@@ -59,6 +59,8 @@ public interface AMQConnectionDelegate
<T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E;
int getMaxChannelID();
-
+
+ int getMinChannelID();
+
ProtocolVersion getProtocolVersion();
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri Feb 4 16:15:27 2011
@@ -37,6 +37,7 @@ import org.apache.qpid.client.failover.F
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.transport.Connection;
@@ -82,6 +83,12 @@ public class AMQConnectionDelegate_0_10
throws JMSException
{
_conn.checkNotClosed();
+
+ if (_conn.channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+ }
+
int channelId = _conn.getNextChannelID();
AMQSession session;
try
@@ -120,6 +127,12 @@ public class AMQConnectionDelegate_0_10
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
_conn.checkNotClosed();
+
+ if (_conn.channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
+ }
+
int channelId = _conn.getNextChannelID();
XASessionImpl session;
try
@@ -165,6 +178,7 @@ public class AMQConnectionDelegate_0_10
_conn._connected = true;
_conn.setUsername(_qpidConnection.getUserID());
+ _conn.setMaximumChannelCount(_qpidConnection.getChannelMax());
_conn._failoverPolicy.attainedConnection();
}
catch (ProtocolVersionException pe)
@@ -293,7 +307,13 @@ public class AMQConnectionDelegate_0_10
public int getMaxChannelID()
{
- return Integer.MAX_VALUE;
+ //For a negotiated channelMax N, there are channels 0 to N-1 available.
+ return _qpidConnection.getChannelMax() - 1;
+ }
+
+ public int getMinChannelID()
+ {
+ return Connection.MIN_USABLE_CHANNEL_NUM;
}
public ProtocolVersion getProtocolVersion()
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri Feb 4 16:15:27 2011
@@ -36,6 +36,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.transport.TransportConnection;
@@ -134,7 +135,7 @@ public class AMQConnectionDelegate_8_0 i
if (_conn.channelLimitReached())
{
- throw new ChannelLimitReachedException(_conn._maximumChannelCount);
+ throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
@@ -307,7 +308,14 @@ public class AMQConnectionDelegate_8_0 i
public int getMaxChannelID()
{
- return (int) (Math.pow(2, 16)-1);
+ ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters();
+
+ return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax();
+ }
+
+ public int getMinChannelID()
+ {
+ return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM;
}
public ProtocolVersion getProtocolVersion()
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java?rev=1067210&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/ChannelToSessionMap.java Fri Feb 4 16:15:27 2011
@@ -0,0 +1,147 @@
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class ChannelToSessionMap
+{
+ private final AMQSession[] _fastAccessSessions = new AMQSession[16];
+ private final LinkedHashMap<Integer, AMQSession> _slowAccessSessions = new LinkedHashMap<Integer, AMQSession>();
+ private int _size = 0;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private AtomicInteger _idFactory = new AtomicInteger(0);
+ private int _maxChannelID;
+ private int _minChannelID;
+
+ public AMQSession get(int channelId)
+ {
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ return _fastAccessSessions[channelId];
+ }
+ else
+ {
+ return _slowAccessSessions.get(channelId);
+ }
+ }
+
+ public AMQSession put(int channelId, AMQSession session)
+ {
+ AMQSession oldVal;
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ oldVal = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = session;
+ }
+ else
+ {
+ oldVal = _slowAccessSessions.put(channelId, session);
+ }
+ if ((oldVal != null) && (session == null))
+ {
+ _size--;
+ }
+ else if ((oldVal == null) && (session != null))
+ {
+ _size++;
+ }
+
+ return session;
+
+ }
+
+ public AMQSession remove(int channelId)
+ {
+ AMQSession session;
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ session = _fastAccessSessions[channelId];
+ _fastAccessSessions[channelId] = null;
+ }
+ else
+ {
+ session = _slowAccessSessions.remove(channelId);
+ }
+
+ if (session != null)
+ {
+ _size--;
+ }
+ return session;
+
+ }
+
+ public Collection<AMQSession> values()
+ {
+ ArrayList<AMQSession> values = new ArrayList<AMQSession>(size());
+
+ for (int i = 0; i < 16; i++)
+ {
+ if (_fastAccessSessions[i] != null)
+ {
+ values.add(_fastAccessSessions[i]);
+ }
+ }
+ values.addAll(_slowAccessSessions.values());
+
+ return values;
+ }
+
+ public int size()
+ {
+ return _size;
+ }
+
+ public void clear()
+ {
+ _size = 0;
+ _slowAccessSessions.clear();
+ for (int i = 0; i < 16; i++)
+ {
+ _fastAccessSessions[i] = null;
+ }
+ }
+
+ /*
+ * Synchronized on whole method so that we don't need to consider the
+ * increment-then-reset path in too much detail
+ */
+ public synchronized int getNextChannelId()
+ {
+ int id = _minChannelID;
+
+ boolean done = false;
+ while (!done)
+ {
+ id = _idFactory.getAndIncrement();
+ if (id == _maxChannelID)
+ {
+ //go back to the start
+ _idFactory.set(_minChannelID);
+ }
+ if ((id & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ done = (_fastAccessSessions[id] == null);
+ }
+ else
+ {
+ done = (!_slowAccessSessions.keySet().contains(id));
+ }
+ }
+
+ return id;
+ }
+
+ public void setMaxChannelID(int maxChannelID)
+ {
+ _maxChannelID = maxChannelID;
+ }
+
+ public void setMinChannelID(int minChannelID)
+ {
+ _minChannelID = minChannelID;
+ _idFactory.set(_minChannelID);
+ }
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Fri Feb 4 16:15:27 2011
@@ -55,9 +55,12 @@ public class ConnectionTuneMethodHandler
{
params = new ConnectionTuneParameters();
}
+
+ int maxChannelNumber = frame.getChannelMax();
+ //0 implies no limit, except that forced by protocol limitations (0xFFFF)
+ params.setChannelMax(maxChannelNumber == 0 ? AMQProtocolSession.MAX_CHANNEL_MAX : maxChannelNumber);
params.setFrameMax(frame.getFrameMax());
- params.setChannelMax(frame.getChannelMax());
params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
session.setConnectionTuneParameters(params);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Feb 4 16:15:27 2011
@@ -54,6 +54,10 @@ public class AMQProtocolSession implemen
public static final String PROTOCOL_INITIATION_RECEIVED = "ProtocolInitiatiionReceived";
+ //Usable channels are numbered 1 to <ChannelMax>
+ public static final int MAX_CHANNEL_MAX = 0xFFFF;
+ public static final int MIN_USABLE_CHANNEL_NUM = 1;
+
protected static final String CONNECTION_TUNE_PARAMETERS = "ConnectionTuneParameters";
protected static final String AMQ_CONNECTION = "AMQConnection";
@@ -178,6 +182,7 @@ public class AMQProtocolSession implemen
{
_connectionTuneParameters = params;
AMQConnection con = getAMQConnection();
+
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
_protocolHandler.initHeartbeats((int) params.getHeartbeat());
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/jms/ChannelLimitReachedException.java Fri Feb 4 16:15:27 2011
@@ -33,9 +33,9 @@ public class ChannelLimitReachedExceptio
public ChannelLimitReachedException(long limit)
{
- super("Unable to create session since maximum number of sessions per connection is " +
- limit + ". Either close one or more sessions or increase the " +
- "maximum number of sessions per connection (or contact your AMQP administrator.", ERROR_CODE);
+ super("Unable to create session, the maximum number of sessions per connection is " +
+ limit + ". You must either close one or more sessions or increase the " +
+ "maximum number of sessions available per connection.", ERROR_CODE);
_limit = limit;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Fri Feb 4 16:15:27 2011
@@ -86,7 +86,8 @@ public class ClientDelegate extends Conn
}
}
- @Override public void connectionStart(Connection conn, ConnectionStart start)
+ @Override
+ public void connectionStart(Connection conn, ConnectionStart start)
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
@@ -156,7 +157,8 @@ public class ClientDelegate extends Conn
}
}
- @Override public void connectionSecure(Connection conn, ConnectionSecure secure)
+ @Override
+ public void connectionSecure(Connection conn, ConnectionSecure secure)
{
SaslClient sc = conn.getSaslClient();
try
@@ -170,9 +172,9 @@ public class ClientDelegate extends Conn
}
}
- @Override public void connectionTune(Connection conn, ConnectionTune tune)
+ @Override
+ public void connectionTune(Connection conn, ConnectionTune tune)
{
- conn.setChannelMax(tune.getChannelMax());
int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
@@ -182,10 +184,17 @@ public class ClientDelegate extends Conn
hb_interval);
// The idle timeout is twice the heartbeat amount (in milisecs)
conn.setIdleTimeout(hb_interval*1000*2);
+
+ int channelMax = tune.getChannelMax();
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
}
- @Override public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
+ @Override
+ public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
SaslClient sc = conn.getSaslClient();
if (sc != null)
@@ -210,12 +219,14 @@ public class ClientDelegate extends Conn
conn.setState(OPEN);
}
- @Override public void connectionRedirect(Connection conn, ConnectionRedirect redir)
+ @Override
+ public void connectionRedirect(Connection conn, ConnectionRedirect redir)
{
throw new UnsupportedOperationException();
}
- @Override public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
+ @Override
+ public void connectionHeartbeat(Connection conn, ConnectionHeartbeat hearbeat)
{
conn.connectionHeartbeat();
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Fri Feb 4 16:15:27 2011
@@ -59,6 +59,9 @@ public class Connection extends Connecti
protected static final Logger log = Logger.get(Connection.class);
+ //Usable channels are numbered 0 to <ChannelMax> - 1
+ public static final int MAX_CHANNEL_MAX = 0xFFFF;
+ public static final int MIN_USABLE_CHANNEL_NUM = 0;
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
@@ -404,7 +407,8 @@ public class Connection extends Connecti
{
synchronized (lock)
{
- for (int i = 1; i <= getChannelMax(); i++)
+ //For a negotiated channelMax N, there are channels 0 to N-1 available.
+ for (int i = 0; i < getChannelMax(); i++)
{
if (!channels.containsKey(i))
{
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Fri Feb 4 16:15:27 2011
@@ -30,6 +30,8 @@ import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* ServerDelegate
@@ -38,8 +40,8 @@ import javax.security.sasl.SaslServer;
public class ServerDelegate extends ConnectionDelegate
{
+ protected static final Logger _logger = LoggerFactory.getLogger(ServerDelegate.class);
- private SaslServer saslServer;
private List<Object> _locales;
private List<Object> _mechanisms;
private Map<String, Object> _clientProperties;
@@ -47,7 +49,7 @@ public class ServerDelegate extends Conn
public ServerDelegate()
{
- this(null, Collections.EMPTY_LIST, Collections.singletonList((Object)"utf8"));
+ this(null, Collections.emptyList(), Collections.singletonList((Object)"utf8"));
}
protected ServerDelegate(Map<String, Object> clientProperties, List<Object> mechanisms, List<Object> locales)
@@ -64,7 +66,8 @@ public class ServerDelegate extends Conn
conn.connectionStart(_clientProperties, _mechanisms, _locales);
}
- @Override public void connectionStartOk(Connection conn, ConnectionStartOk ok)
+ @Override
+ public void connectionStartOk(Connection conn, ConnectionStartOk ok)
{
conn.setLocale(ok.getLocale());
String mechanism = ok.getMechanism();
@@ -75,9 +78,9 @@ public class ServerDelegate extends Conn
if (mechanism == null || mechanism.length() == 0)
{
conn.connectionTune
- (Integer.MAX_VALUE,
+ (getChannelMax(),
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, Integer.MAX_VALUE);
+ 0, getHeartbeatMax());
return;
}
@@ -118,7 +121,7 @@ public class ServerDelegate extends Conn
{
ss.dispose();
conn.connectionTune
- (Integer.MAX_VALUE,
+ (getChannelMax(),
org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
0, getHeartbeatMax());
conn.setAuthorizationID(ss.getAuthorizationID());
@@ -140,19 +143,42 @@ public class ServerDelegate extends Conn
return Integer.MAX_VALUE;
}
- @Override public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
+ protected int getChannelMax()
+ {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void connectionSecureOk(Connection conn, ConnectionSecureOk ok)
{
secure(conn, ok.getResponse());
}
- @Override public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
+ @Override
+ public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
+ int okChannelMax = ok.getChannelMax();
+ if (okChannelMax > getChannelMax())
+ {
+ _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
+ "client connectionTuneOk returned a channelMax (" + okChannelMax +
+ ") above the servers offered limit (" + getChannelMax() +")");
+
+ //Due to the error we must forcefully close the connection without negotiation
+ conn.getSender().close();
+ return;
+ }
+
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
- @Override public void connectionOpen(Connection conn, ConnectionOpen open)
+ @Override
+ public void connectionOpen(Connection conn, ConnectionOpen open)
{
- conn.connectionOpenOk(Collections.EMPTY_LIST);
+ conn.connectionOpenOk(Collections.emptyList());
conn.setState(OPEN);
}
@@ -168,7 +194,8 @@ public class ServerDelegate extends Conn
return new Session(conn, new Binary(atc.getName()), 0);
}
- @Override public void sessionAttach(Connection conn, SessionAttach atc)
+ @Override
+ public void sessionAttach(Connection conn, SessionAttach atc)
{
Session ssn = getSession(conn, atc);
conn.map(ssn, atc.getChannel());
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ChannelLoggingTest.java Fri Feb 4 16:15:27 2011
@@ -75,7 +75,7 @@ public class ChannelLoggingTest extends
String log = getLogMessage(results, 0);
// MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1001 : Create
validateMessageID("CHN-1001", log);
- assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log)));
if (isBroker08())
{
@@ -89,7 +89,7 @@ public class ChannelLoggingTest extends
log = getLogMessage(results, 0);
// MESSAGE [con:0(guest@anonymous(3273383)/test)/ch:1] CHN-1004 : Prefetch Size (bytes) {0,number} : Count {1,number}
validateMessageID("CHN-1004", log);
- assertEquals("Incorrect Channel in actor:"+fromActor(log), 1, getChannelID(fromActor(log)));
+ assertEquals("Incorrect Channel in actor:"+fromActor(log), isBroker010()? 0 : 1, getChannelID(fromActor(log)));
assertTrue("Prefetch Count not correct",getMessageString(fromMessage(log)).endsWith("Count "+PREFETCH));
}
@@ -306,7 +306,7 @@ public class ChannelLoggingTest extends
validateMessageID("CHN-1001", open);
validateMessageID("CHN-1003", close);
assertEquals("Message should be Close", "Close", getMessageString(fromMessage(close)));
- assertEquals("Incorrect Channel ID closed", 1, getChannelID(fromSubject(close)));
+ assertEquals("Incorrect Channel ID closed", isBroker010()? 0 : 1, getChannelID(fromSubject(close)));
assertEquals("Channel IDs should be the same", getChannelID(fromActor(open)), getChannelID(fromSubject(close)));
assertEquals("Connection IDs should be the same", getConnectionID(fromActor(open)), getConnectionID(fromSubject(close)));
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?rev=1067210&r1=1067209&r2=1067210&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Fri Feb 4 16:15:27 2011
@@ -266,19 +266,22 @@ public class AMQConnectionTest extends Q
}
}
- public void testGetChannelID()
+ public void testGetChannelID() throws Exception
{
- int maxChannelID = 65536;
+ long maxChannelID = _connection.getMaximumChannelCount();
if (isBroker010())
{
- maxChannelID = Integer.MAX_VALUE+1;
+ //Usable numbers are 0 to N-1 when using 0-10
+ //and 1 to N for 0-8/0-9
+ maxChannelID = maxChannelID-1;
}
for (int j = 0; j < 3; j++)
{
- for (int i = 1; i < maxChannelID; i++)
+ int i = isBroker010() ? 0 : 1;
+ for ( ; i <= maxChannelID; i++)
{
int id = _connection.getNextChannelID();
- assertEquals("On iterartion "+j, i, id);
+ assertEquals("Unexpected number on iteration "+j, i, id);
_connection.deregisterSession(id);
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org