You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/09/30 16:16:36 UTC

svn commit: r1527584 - in /qpid/trunk/qpid/java: amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/ amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ ...

Author: rgodfrey
Date: Mon Sep 30 14:16:35 2013
New Revision: 1527584

URL: http://svn.apache.org/r1527584
Log:
QPID-5195 : Client does not properly support channel-max

Modified:
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
    qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
    qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Mon Sep 30 14:16:35 2013
@@ -48,6 +48,7 @@ public class ConnectionFactoryImpl imple
     private String _topicPrefix;
     private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
     private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+    private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
 
 
     public ConnectionFactoryImpl(final String host,
@@ -85,6 +86,18 @@ public class ConnectionFactoryImpl imple
                                  final String remoteHost,
                                  final boolean ssl)
     {
+        this(host, port, username, password, clientId, remoteHost, ssl,0);
+    }
+
+    public ConnectionFactoryImpl(final String host,
+                                 final int port,
+                                 final String username,
+                                 final String password,
+                                 final String clientId,
+                                 final String remoteHost,
+                                 final boolean ssl,
+                                 final int maxSessions)
+    {
         _host = host;
         _port = port;
         _username = username;
@@ -92,6 +105,7 @@ public class ConnectionFactoryImpl imple
         _clientId = clientId;
         _remoteHost = remoteHost;
         _ssl = ssl;
+        _maxSessions = maxSessions;
     }
 
     public ConnectionImpl createConnection() throws JMSException
@@ -101,7 +115,7 @@ public class ConnectionFactoryImpl imple
 
     public ConnectionImpl createConnection(final String username, final String password) throws JMSException
     {
-        ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl);
+        ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
         connection.setQueuePrefix(_queuePrefix);
         connection.setTopicPrefix(_topicPrefix);
         connection.setUseBinaryMessageId(_useBinaryMessageId);
@@ -158,6 +172,7 @@ public class ConnectionFactoryImpl imple
 
         boolean binaryMessageId = true;
         boolean syncPublish = false;
+        int maxSessions = 0;
 
         if(userInfo != null)
         {
@@ -194,6 +209,10 @@ public class ConnectionFactoryImpl imple
                 {
                     syncPublish = Boolean.parseBoolean(keyValuePair[1]);
                 }
+                else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
+                {
+                    maxSessions = Integer.parseInt(keyValuePair[1]);
+                }
             }
         }
 
@@ -203,7 +222,7 @@ public class ConnectionFactoryImpl imple
         }
 
         ConnectionFactoryImpl connectionFactory =
-                new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl);
+                new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions);
         connectionFactory.setUseBinaryMessageId(binaryMessageId);
         connectionFactory.setSyncPublish(syncPublish);
 

Modified: qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Mon Sep 30 14:16:35 2013
@@ -60,6 +60,7 @@ public class ConnectionImpl implements C
     private String _topicPrefix;
     private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
     private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
+    private int _maxSessions;
 
     private static enum State
     {
@@ -83,6 +84,11 @@ public class ConnectionImpl implements C
 
     public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl) throws JMSException
     {
+        this(host, port, username, password, clientId, remoteHost, ssl,0);
+    }
+
+    public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
+    {
         _host = host;
         _port = port;
         _username = username;
@@ -90,6 +96,7 @@ public class ConnectionImpl implements C
         _clientId = clientId;
         _remoteHost = remoteHost;
         _ssl = ssl;
+        _maxSessions = maxSessions;
     }
 
     private void connect() throws JMSException
@@ -106,7 +113,8 @@ public class ConnectionImpl implements C
                 try
                 {
                     _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
-                            _port, _username, _password, container, _remoteHost, _ssl);
+                            _port, _username, _password, container, _remoteHost, _ssl,
+                            _maxSessions - 1);
                     _conn.setConnectionErrorTask(new ConnectionErrorTask());
                     // TODO - retrieve negotiated AMQP version
                     _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);

Modified: qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Util.java Mon Sep 30 14:16:35 2013
@@ -502,9 +502,11 @@ public abstract class Util
     {
         Container container = getContainerName() == null ? new Container() : new Container(getContainerName());
         return getUsername() == null ? new Connection(getHost(), getPort(), null, null, _frameSize, container,
-                                                      _remoteHost == null ? getHost() : _remoteHost, _useSSL)
+                                                      _remoteHost == null ? getHost() : _remoteHost, _useSSL,
+                                                      0)
                                      : new Connection(getHost(), getPort(), getUsername(), getPassword(), _frameSize,
-                                                      container, _remoteHost == null ? getHost() : _remoteHost, _useSSL);
+                                                      container, _remoteHost == null ? getHost() : _remoteHost, _useSSL,
+                                                      0);
     }
 
     public String getContainerName()

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Mon Sep 30 14:16:35 2013
@@ -108,7 +108,7 @@ public class Connection implements Socke
                   final Container container,
                   final String remoteHostname) throws ConnectionException
     {
-        this(address,port,username,password,maxFrameSize,container,remoteHostname,false);
+        this(address,port,username,password,maxFrameSize,container,remoteHostname,false,-1);
     }
 
     public Connection(final String address,
@@ -118,7 +118,7 @@ public class Connection implements Socke
                   final Container container,
                   final boolean ssl) throws ConnectionException
     {
-        this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl);
+        this(address, port, username, password, MAX_FRAME_SIZE,container,null,ssl,-1);
     }
 
     public Connection(final String address,
@@ -128,28 +128,32 @@ public class Connection implements Socke
                   final String remoteHost,
                   final boolean ssl) throws ConnectionException
     {
-        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl);
+        this(address, port, username, password, MAX_FRAME_SIZE,new Container(),remoteHost,ssl,-1);
     }
 
     public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final Container container,
-                  final String remoteHost,
-                  final boolean ssl) throws ConnectionException
+                      final int port,
+                      final String username,
+                      final String password,
+                      final Container container,
+                      final String remoteHost,
+                      final boolean ssl,
+                      final int channelMax) throws ConnectionException
     {
-        this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl);
+        this(address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,ssl,
+             channelMax);
     }
 
 
     public Connection(final String address,
-                  final int port,
-                  final String username,
-                  final String password,
-                  final int maxFrameSize,
-                  final Container container,
-                  final String remoteHostname, boolean ssl) throws ConnectionException
+                      final int port,
+                      final String username,
+                      final String password,
+                      final int maxFrameSize,
+                      final Container container,
+                      final String remoteHostname,
+                      boolean ssl,
+                      int channelMax) throws ConnectionException
     {
 
         _address = address;
@@ -176,6 +180,10 @@ public class Connection implements Socke
                 }
             };
             _conn = new ConnectionEndpoint(container, principal, password);
+            if(channelMax >= 0)
+            {
+                _conn.setChannelMax((short)channelMax);
+            }
             _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
             _conn.setRemoteAddress(s.getRemoteSocketAddress());
             _conn.setRemoteHostname(remoteHostname);

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionException.java Mon Sep 30 14:16:35 2013
@@ -22,6 +22,11 @@ package org.apache.qpid.amqp_1_0.client;
 
 public class ConnectionException extends Exception
 {
+    protected ConnectionException(final String message)
+    {
+        super(message);
+    }
+
     public ConnectionException(Throwable cause)
     {
         super(cause);

Modified: qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Sep 30 14:16:35 2013
@@ -51,10 +51,14 @@ public class Session
     private TransactionController _sessionLocalTC;
     private Connection _connection;
 
-    public Session(final Connection connection, String name)
+    public Session(final Connection connection, String name) throws SessionCreationException
     {
         _connection = connection;
         _endpoint = connection.getEndpoint().createSession(name);
+        if(_endpoint == null)
+        {
+            throw new SessionCreationException("Cannot create session as all channels are in use");
+        }
         _sectionEncoder = new SectionEncoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
         _sectionDecoder = new SectionDecoderImpl(connection.getEndpoint().getDescribedTypeRegistry());
     }
@@ -385,4 +389,14 @@ public class Session
     {
         public void configureSource(final Source source);
     }
+
+    private class SessionCreationException extends ConnectionException
+    {
+
+        private SessionCreationException(final String message)
+        {
+            super(message);
+        }
+
+    }
 }

Modified: qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1527584&r1=1527583&r2=1527584&view=diff
==============================================================================
--- qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java (original)
+++ qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java Mon Sep 30 14:16:35 2013
@@ -68,22 +68,22 @@ public class ConnectionEndpoint implemen
     private final Container _container;
     private Principal _user;
 
-    private static final short DEFAULT_CHANNEL_MAX = 255;
+    private static final short DEFAULT_CHANNEL_MAX = Integer.getInteger("amqp.channel_max", 255).shortValue();
     private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
 
 
     private ConnectionState _state = ConnectionState.UNOPENED;
-    private short _channelMax;
+    private short _channelMax = DEFAULT_CHANNEL_MAX;
     private int _maxFrameSize = 4096;
     private String _remoteContainerId;
 
     private SocketAddress _remoteAddress;
 
     // positioned by the *outgoing* channel
-    private SessionEndpoint[] _sendingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+    private SessionEndpoint[] _sendingSessions;
 
     // positioned by the *incoming* channel
-    private SessionEndpoint[] _receivingSessions = new SessionEndpoint[DEFAULT_CHANNEL_MAX + 1];
+    private SessionEndpoint[] _receivingSessions;
     private boolean _closedForInput;
     private boolean _closedForOutput;
 
@@ -165,7 +165,7 @@ public class ConnectionEndpoint implemen
         }
         if (_state == ConnectionState.UNOPENED)
         {
-            sendOpen(DEFAULT_CHANNEL_MAX, DEFAULT_MAX_FRAME);
+            sendOpen(_channelMax, DEFAULT_MAX_FRAME);
             _state = ConnectionState.AWAITING_OPEN;
         }
     }
@@ -183,10 +183,10 @@ public class ConnectionEndpoint implemen
     public synchronized SessionEndpoint createSession(String name)
     {
         // todo assert connection state
-        SessionEndpoint endpoint = new SessionEndpoint(this);
         short channel = getFirstFreeChannel();
         if (channel != -1)
         {
+            SessionEndpoint endpoint = new SessionEndpoint(this);
             _sendingSessions[channel] = endpoint;
             endpoint.setSendingChannel(channel);
             Begin begin = new Begin();
@@ -196,13 +196,14 @@ public class ConnectionEndpoint implemen
 
             begin.setHandleMax(_handleMax);
             send(channel, begin);
+            return endpoint;
 
         }
         else
         {
-            // todo error
+            // TODO - report error
+            return null;
         }
-        return endpoint;
     }
 
 
@@ -235,7 +236,16 @@ public class ConnectionEndpoint implemen
     {
         Open open = new Open();
 
-        open.setChannelMax(UnsignedShort.valueOf(DEFAULT_CHANNEL_MAX));
+        if(_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[channelMax+1];
+            _sendingSessions = new SessionEndpoint[channelMax+1];
+        }
+        if(channelMax < _channelMax)
+        {
+            _channelMax = channelMax;
+        }
+        open.setChannelMax(UnsignedShort.valueOf(channelMax));
         open.setContainerId(_container.getId());
         open.setMaxFrameSize(getDesiredMaxFrameSize());
         open.setHostname(getRemoteHostname());
@@ -268,7 +278,7 @@ public class ConnectionEndpoint implemen
 
     short getFirstFreeChannel()
     {
-        for (int i = 0; i < _sendingSessions.length; i++)
+        for (int i = 0; i <= _channelMax; i++)
         {
             if (_sendingSessions[i] == null)
             {
@@ -288,10 +298,16 @@ public class ConnectionEndpoint implemen
     public synchronized void receiveOpen(short channel, Open open)
     {
 
-        _channelMax = open.getChannelMax() == null ? DEFAULT_CHANNEL_MAX
-                : open.getChannelMax().shortValue() < DEFAULT_CHANNEL_MAX
-                        ? DEFAULT_CHANNEL_MAX
-                        : open.getChannelMax().shortValue();
+        _channelMax = open.getChannelMax() == null ? _channelMax
+                : open.getChannelMax().shortValue() < _channelMax
+                        ? open.getChannelMax().shortValue()
+                        : _channelMax;
+
+        if(_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[_channelMax+1];
+            _sendingSessions = new SessionEndpoint[_channelMax+1];
+        }
 
         UnsignedInteger remoteDesiredMaxFrameSize =
                 open.getMaxFrameSize() == null ? UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
@@ -618,7 +634,6 @@ public class ConnectionEndpoint implemen
         }
     }
 
-
     public void invalidHeaderReceived()
     {
         // TODO
@@ -998,4 +1013,9 @@ public class ConnectionEndpoint implemen
     {
         return _remoteError;
     }
+
+    public void setChannelMax(final short channelMax)
+    {
+        _channelMax = channelMax;
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org