You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/30 16:16:36 UTC
svn commit: r1527584 - in /qpid/trunk/qpid/java:
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/
amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ ...
Author: rgodfrey
Date: Mon Sep 30 14:16:35 2013
New Revision: 1527584
URL: http://svn.apache.org/r1527584
Log:
QPID-5195 : Client does not properly support channel-max
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Mon Sep 30 14:16:35 2013
@@ -48,6 +48,7 @@ public class ConnectionFactoryImpl imple
private String _topicPrefix;
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+ private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
public ConnectionFactoryImpl(final String host,
@@ -85,6 +86,18 @@ public class ConnectionFactoryImpl imple
final String remoteHost,
final boolean ssl)
{
+ this(host, port, username, password, clientId, remoteHost, ssl,0);
+ }
+
+ public ConnectionFactoryImpl(final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final String remoteHost,
+ final boolean ssl,
+ final int maxSessions)
+ {
_host = host;
_port = port;
_username = username;
@@ -92,6 +105,7 @@ public class ConnectionFactoryImpl imple
_clientId = clientId;
_remoteHost = remoteHost;
_ssl = ssl;
+ _maxSessions = maxSessions;
}
public ConnectionImpl createConnection() throws JMSException
@@ -101,7 +115,7 @@ public class ConnectionFactoryImpl imple
public ConnectionImpl createConnection(final String username, final String password) throws JMSException
{
- ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+ ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
connection.setQueuePrefix(_queuePrefix);
connection.setTopicPrefix(_topicPrefix);
connection.setUseBinaryMessageId(_useBinaryMessageId);
@@ -158,6 +172,7 @@ public class ConnectionFactoryImpl imple
boolean binaryMessageId = true;
boolean syncPublish = false;
+ int maxSessions = 0;
if(userInfo != null)
{
@@ -194,6 +209,10 @@ public class ConnectionFactoryImpl imple
{
syncPublish = Boolean.parseBoolean(keyValuePair[1]);
}
+ else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
+ {
+ maxSessions = Integer.parseInt(keyValuePair[1]);
+ }
}
}
@@ -203,7 +222,7 @@ public class ConnectionFactoryImpl imple
}
ConnectionFactoryImpl connectionFactory =
- new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+ new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions);
connectionFactory.setUseBinaryMessageId(binaryMessageId);
connectionFactory.setSyncPublish(syncPublish);
Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Mon Sep 30 14:16:35 2013
@@ -60,6 +60,7 @@ public class ConnectionImpl implements C
private String _topicPrefix;
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+ private int _maxSessions;
private static enum State
{
@@ -83,6 +84,11 @@ public class ConnectionImpl implements C
public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
{
+ this(host, port, username, password, clientId, remoteHost, ssl,0);
+ }
+
+ public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
+ {
_host = host;
_port = port;
_username = username;
@@ -90,6 +96,7 @@ public class ConnectionImpl implements C
_clientId = clientId;
_remoteHost = remoteHost;
_ssl = ssl;
+ _maxSessions = maxSessions;
}
private void connect() throws JMSException
@@ -106,7 +113,8 @@ public class ConnectionImpl implements C
try
{
_conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
- _port, _username, _password, container, _remoteHost, _ssl);
+ _port, _username, _password, container, _remoteHost, _ssl,
+ _maxSessions - 1);
_conn.setConnectionErrorTask(new ConnectionErrorTask());
// TODO - retrieve negotiated AMQP version
_connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
Modified: qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java Mon Sep 30 14:16:35 2013
@@ -502,9 +502,11 @@ public abstract class Util
{
Container container = getContainerName() == null ? new Container() : new Container(getContainerName());
return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container,
- _remoteHost == null ? getHost() : _remoteHost, _useSSL)
+ _remoteHost == null ? getHost() : _remoteHost, _useSSL,
+ 0)
: new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize,
- container, _remoteHost == null ? getHost() : _remoteHost, _useSSL);
+ container, _remoteHost == null ? getHost() : _remoteHost, _useSSL,
+ 0);
}
public String getContainerName()
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Mon Sep 30 14:16:35 2013
@@ -108,7 +108,7 @@ public class Connection implements Socke
final Container container,
final String remoteHostname) throws ConnectionException
{
- this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+ this(address,port,username,password,maxFrameSize,container,remoteHostname,false,-1);
}
public Connection(final String address,
@@ -118,7 +118,7 @@ public class Connection implements Socke
final Container container,
final boolean ssl) throws ConnectionException
{
- this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+ this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl,-1);
}
public Connection(final String address,
@@ -128,28 +128,32 @@ public class Connection implements Socke
final String remoteHost,
final boolean ssl) throws ConnectionException
{
- this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+ this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl,-1);
}
public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final Container container,
- final String remoteHost,
- final boolean ssl) throws ConnectionException
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final boolean ssl,
+ final int channelMax) throws ConnectionException
{
- this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+ this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl,
+ channelMax);
}
public Connection(final String address,
- final int port,
- final String username,
- final String password,
- final int maxFrameSize,
- final Container container,
- final String remoteHostname, boolean ssl) throws ConnectionException
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname,
+ boolean ssl,
+ int channelMax) throws ConnectionException
{
_address = address;
@@ -176,6 +180,10 @@ public class Connection implements Socke
}
};
_conn = new ConnectionEndpoint(container, principal, password);
+ if(channelMax >= 0)
+ {
+ _conn.setChannelMax((short)channelMax);
+ }
_conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
_conn.setRemoteAddress(s.getRemoteSocketAddress());
_conn.setRemoteHostname(remoteHostname);
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java Mon Sep 30 14:16:35 2013
@@ -22,6 +22,11 @@ package org.apache.qpid.amqp_1_0.client;
public class ConnectionException extends Exception
{
+ protected ConnectionException(final String message)
+ {
+ super(message);
+ }
+
public ConnectionException(Throwable cause)
{
super(cause);
Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Sep 30 14:16:35 2013
@@ -51,10 +51,14 @@ public class Session
private TransactionController _sessionLocalTC;
private Connection _connection;
- public Session(final Connection connection, String name)
+ public Session(final Connection connection, String name) throws SessionCreationException
{
_connection = connection;
_endpoint = connection.getEndpoint().createSession(name);
+ if(_endpoint == null)
+ {
+ throw new SessionCreationException("Cannot create session as all channels are in use");
+ }
_sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
_sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
}
@@ -385,4 +389,14 @@ public class Session
{
public void configureSource(final Source source);
}
+
+ private class SessionCreationException extends ConnectionException
+ {
+
+ private SessionCreationException(final String message)
+ {
+ super(message);
+ }
+
+ }
}
Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Sep 30 14:16:35 2013
@@ -68,22 +68,22 @@ public class ConnectionEndpoint implemen
private final Container _container;
private Principal _user;
- private static final short DEFAULT_CHANNEL_MAX = 255;
+ private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
private ConnectionState _state = ConnectionState.UNOPENED;
- private short _channelMax;
+ private short _channelMax = DEFAULT_CHANNEL_MAX;
private int _maxFrameSize = 4096;
private String _remoteContainerId;
private SocketAddress _remoteAddress;
// positioned by the *outgoing* channel
- private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _sendingSessions;
// positioned by the *incoming* channel
- private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+ private SessionEndpoint[] _receivingSessions;
private boolean _closedForInput;
private boolean _closedForOutput;
@@ -165,7 +165,7 @@ public class ConnectionEndpoint implemen
}
if (_state == ConnectionState.UNOPENED)
{
- sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME);
+ sendOpen(_channelMax, DEFAULT_MAX_FRAME);
_state = ConnectionState.AWAITING_OPEN;
}
}
@@ -183,10 +183,10 @@ public class ConnectionEndpoint implemen
public synchronized SessionEndpoint createSession(String name)
{
// todo assert connection state
- SessionEndpoint endpoint = new SessionEndpoint(this);
short channel = getFirstFreeChannel();
if (channel != -1)
{
+ SessionEndpoint endpoint = new SessionEndpoint(this);
_sendingSessions[channel] = endpoint;
endpoint.setSendingChannel(channel);
Begin begin = new Begin();
@@ -196,13 +196,14 @@ public class ConnectionEndpoint implemen
begin.setHandleMax(_handleMax);
send(channel, begin);
+ return endpoint;
}
else
{
- // todo error
+ // TODO - report error
+ return null;
}
- return endpoint;
}
@@ -235,7 +236,16 @@ public class ConnectionEndpoint implemen
{
Open open = new Open();
- open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX));
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[channelMax+1];
+ _sendingSessions = new SessionEndpoint[channelMax+1];
+ }
+ if(channelMax < _channelMax)
+ {
+ _channelMax = channelMax;
+ }
+ open.setChannelMax(UnsignedShort.valueOf(channelMax));
open.setContainerId(_container.getId());
open.setMaxFrameSize(getDesiredMaxFrameSize());
open.setHostname(getRemoteHostname());
@@ -268,7 +278,7 @@ public class ConnectionEndpoint implemen
short getFirstFreeChannel()
{
- for (int i = 0; i < _sendingSessions.length; i++)
+ for (int i = 0; i <= _channelMax; i++)
{
if (_sendingSessions[i] == null)
{
@@ -288,10 +298,16 @@ public class ConnectionEndpoint implemen
public synchronized void receiveOpen(short channel, Open open)
{
- _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX
- ? DEFAULT_CHANNEL_MAX
- : open.getChannelMax().shortValue();
+ _channelMax = open.getChannelMax() == null ? _channelMax
+ : open.getChannelMax().shortValue() < _channelMax
+ ? open.getChannelMax().shortValue()
+ : _channelMax;
+
+ if(_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[_channelMax+1];
+ _sendingSessions = new SessionEndpoint[_channelMax+1];
+ }
UnsignedInteger remoteDesiredMaxFrameSize =
open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
@@ -618,7 +634,6 @@ public class ConnectionEndpoint implemen
}
}
-
public void invalidHeaderReceived()
{
// TODO
@@ -998,4 +1013,9 @@ public class ConnectionEndpoint implemen
{
return _remoteError;
}
+
+ public void setChannelMax(final short channelMax)
+ {
+ _channelMax = channelMax;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org