You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/08/09 18:05:06 UTC

svn commit: r1616977 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/configuration/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_...

Author: rgodfrey
Date: Sat Aug  9 16:05:05 2014
New Revision: 1616977

URL: http://svn.apache.org/r1616977
Log:
QPID-4429 : [Java] Implement max frame size negotiation checks in 0-x protocols

Added:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Removed:
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/BrokerProperties.java Sat Aug  9 16:05:05 2014
@@ -40,10 +40,6 @@ public class BrokerProperties
     public static final String PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY = "qpid.broker_default_supported_protocol_version_reply";
     public static final String PROPERTY_DISABLED_FEATURES = "qpid.broker_disabled_features";
 
-    private static final int DEFAULT_FRAME_SIZE = 65535;
-    public static final String PROPERTY_FRAME_SIZE = "qpid.broker_frame_size";
-    public static final int FRAME_SIZE = Integer.getInteger(PROPERTY_FRAME_SIZE, DEFAULT_FRAME_SIZE);
-
     public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_EXCLUDES = "qpid.broker_default_amqp_protocol_excludes";
     public static final String PROPERTY_BROKER_DEFAULT_AMQP_PROTOCOL_INCLUDES = "qpid.broker_default_amqp_protocol_includes";
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java Sat Aug  9 16:05:05 2014
@@ -78,6 +78,11 @@ public interface Broker<X extends Broker
     @ManagedContextDefault(name = BROKER_FLOW_TO_DISK_THRESHOLD)
     public static final long DEFAULT_FLOW_TO_DISK_THRESHOLD = (long)(0.4 * (double)Runtime.getRuntime().maxMemory());
 
+    String BROKER_FRAME_SIZE = "qpid.broker_frame_size";
+    @ManagedContextDefault(name = BROKER_FRAME_SIZE)
+    long DEFAULT_FRAME_SIZE = 65535;
+
+
     @DerivedAttribute
     String getBuildVersion();
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java Sat Aug  9 16:05:05 2014
@@ -33,6 +33,7 @@ import org.apache.qpid.protocol.ServerPr
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
@@ -91,7 +92,9 @@ public class ProtocolEngine_0_10  extend
             _network = network;
 
             _connection.setNetworkConnection(network);
-            _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
+            Disassembler disassembler = new Disassembler(wrapSender(sender), Constant.MIN_MAX_FRAME_SIZE);
+            _connection.setSender(disassembler);
+            _connection.addFrameSizeObserver(disassembler);
             // FIXME Two log messages to maintain compatibility with earlier protocol versions
             _connection.getEventLogger().message(ConnectionMessages.OPEN(null, "0-10", null, null, false, true, false, false));
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Sat Aug  9 16:05:05 2014
@@ -50,21 +50,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.ConnectionClose;
-import org.apache.qpid.transport.ConnectionCloseCode;
-import org.apache.qpid.transport.ConnectionOpen;
-import org.apache.qpid.transport.ConnectionOpenOk;
-import org.apache.qpid.transport.ConnectionStartOk;
-import org.apache.qpid.transport.ConnectionTuneOk;
-import org.apache.qpid.transport.ServerDelegate;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionAttach;
-import org.apache.qpid.transport.SessionDelegate;
-import org.apache.qpid.transport.SessionDetach;
-import org.apache.qpid.transport.SessionDetachCode;
-import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class ServerConnectionDelegate extends ServerDelegate
@@ -76,15 +62,16 @@ public class ServerConnectionDelegate ex
     private int _maxNoOfChannels;
     private Map<String,Object> _clientProperties;
     private final SubjectCreator _subjectCreator;
+    private int _maximumFrameSize;
 
-    public ServerConnectionDelegate(Broker broker, String localFQDN, SubjectCreator subjectCreator)
+    public ServerConnectionDelegate(Broker<?> broker, String localFQDN, SubjectCreator subjectCreator)
     {
         this(createConnectionProperties(broker), Collections.singletonList((Object)"en_US"), broker, localFQDN, subjectCreator);
     }
 
     private ServerConnectionDelegate(Map<String, Object> properties,
                                      List<Object> locales,
-                                     Broker broker,
+                                     Broker<?> broker,
                                      String localFQDN,
                                      SubjectCreator subjectCreator)
     {
@@ -94,9 +81,10 @@ public class ServerConnectionDelegate ex
         _localFQDN = localFQDN;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _subjectCreator = subjectCreator;
+        _maximumFrameSize = (int) Math.min(0xffffl, broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE));
     }
 
-    private static List<String> getFeatures(Broker broker)
+    private static List<String> getFeatures(Broker<?> broker)
     {
         String brokerDisabledFeatures = System.getProperty(BrokerProperties.PROPERTY_DISABLED_FEATURES);
         final List<String> features = new ArrayList<String>();
@@ -108,7 +96,7 @@ public class ServerConnectionDelegate ex
         return Collections.unmodifiableList(features);
     }
 
-    private static Map<String, Object> createConnectionProperties(final Broker broker)
+    private static Map<String, Object> createConnectionProperties(final Broker<?> broker)
     {
         final Map<String,Object> map = new HashMap<String,Object>();
         // Federation tag is used by the client to identify the broker instance
@@ -234,6 +222,7 @@ public class ServerConnectionDelegate ex
     {
         ServerConnection sconn = (ServerConnection) conn;
         int okChannelMax = ok.getChannelMax();
+        int okMaxFrameSize = ok.getMaxFrameSize();
 
         if (okChannelMax > getChannelMax())
         {
@@ -246,6 +235,31 @@ public class ServerConnectionDelegate ex
             return;
         }
 
+        if(okMaxFrameSize > getFrameMax())
+        {
+            LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+                         "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+                         ") above the server's offered limit (" + getFrameMax() +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            sconn.getSender().close();
+            return;
+        }
+        else if(okMaxFrameSize > 0 && okMaxFrameSize < Constant.MIN_MAX_FRAME_SIZE)
+        {
+            LOGGER.error("Connection '" + sconn.getConnectionId() + "' being severed, " +
+                         "client connectionTuneOk returned a frameMax (" + okMaxFrameSize +
+                         ") below the minimum permitted size (" + Constant.MIN_MAX_FRAME_SIZE +")");
+
+            //Due to the error we must forcefully close the connection without negotiation
+            sconn.getSender().close();
+            return;
+        }
+        else if(okMaxFrameSize == 0)
+        {
+            okMaxFrameSize = getFrameMax();
+        }
+
         final NetworkConnection networkConnection = sconn.getNetworkConnection();
         if(ok.hasHeartbeat())
         {
@@ -266,6 +280,8 @@ public class ServerConnectionDelegate ex
         }
 
         setConnectionTuneOkChannelMax(sconn, okChannelMax);
+
+        conn.setMaxFrameSize(okMaxFrameSize);
     }
 
     @Override
@@ -279,6 +295,12 @@ public class ServerConnectionDelegate ex
         _maxNoOfChannels = channelMax;
     }
 
+    @Override
+    protected int getFrameMax()
+    {
+        return _maximumFrameSize;
+    }
+
     @Override public void sessionDetach(Connection conn, SessionDetach dtc)
     {
         // To ensure a clean detach, we stop any remaining subscriptions. Stop ensures

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sat Aug  9 16:05:05 2014
@@ -44,37 +44,38 @@ import javax.security.auth.Subject;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.ServerProtocolEngine;
+import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.SessionModelListener;
+import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -124,7 +125,7 @@ public class AMQProtocolEngine implement
 
     private final AMQStateManager _stateManager;
 
-    private AMQCodecFactory _codecFactory;
+    private AMQDecoder _decoder;
 
     private SaslServer _saslServer;
 
@@ -187,7 +188,7 @@ public class AMQProtocolEngine implement
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _receivedLock = new ReentrantLock();
         _stateManager = new AMQStateManager(broker, this);
-        _codecFactory = new AMQCodecFactory(true, this);
+        _decoder = new AMQDecoder(true, this);
         _connectionID = connectionId;
         _logSubject = new ConnectionLogSubject(this);
 
@@ -250,6 +251,7 @@ public class AMQProtocolEngine implement
     public void setMaxFrameSize(long frameMax)
     {
         _maxFrameSize = frameMax;
+        _decoder.setMaxFrameSize(frameMax);
     }
 
     public long getMaxFrameSize()
@@ -299,7 +301,7 @@ public class AMQProtocolEngine implement
                 _receivedLock.lock();
                 try
                 {
-                    final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+                    final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
                     for (AMQDataBlock dataBlock : dataBlocks)
                     {
                         try
@@ -493,7 +495,7 @@ public class AMQProtocolEngine implement
     private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
     {
         // this ensures the codec never checks for a PI message again
-        (_codecFactory.getDecoder()).setExpectProtocolInitiation(false);
+        _decoder.setExpectProtocolInitiation(false);
         try
         {
             // Log incoming protocol negotiation request

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java Sat Aug  9 16:05:05 2014
@@ -33,7 +33,6 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionSecureOkMethodHan
 
     public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
     {
-        Broker broker = stateManager.getBroker();
+        Broker<?> broker = stateManager.getBroker();
         AMQProtocolSession session = stateManager.getProtocolSession();
 
         SubjectCreator subjectCreator = stateManager.getSubjectCreator();
@@ -99,7 +98,7 @@ public class ConnectionSecureOkMethodHan
 
                 ConnectionTuneBody tuneBody =
                         methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
-                                                                BrokerProperties.FRAME_SIZE,
+                                                                broker.getContextValue(Long.class, Broker.BROKER_FRAME_SIZE),
                                                                 broker.getConnection_heartBeatDelay());
                 session.writeFrame(tuneBody.generateFrame(0));
                 session.setAuthorizedSubject(authResult.getSubject());

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java Sat Aug  9 16:05:05 2014
@@ -32,7 +32,6 @@ import org.apache.qpid.framing.Connectio
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
@@ -59,7 +58,7 @@ public class ConnectionStartOkMethodHand
 
     public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
     {
-        Broker broker = stateManager.getBroker();
+        Broker<?> broker = stateManager.getBroker();
         AMQProtocolSession session = stateManager.getProtocolSession();
 
         _logger.info("SASL Mechanism selected: " + body.getMechanism());
@@ -113,7 +112,7 @@ public class ConnectionStartOkMethodHand
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
 
                     ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
-                                                                                          BrokerProperties.FRAME_SIZE,
+                                                                                          broker.getContextValue(Long.class,Broker.BROKER_FRAME_SIZE),
                                                                                           broker.getConnection_heartBeatDelay());
                     session.writeFrame(tuneBody.generateFrame(0));
                     break;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java Sat Aug  9 16:05:05 2014
@@ -22,8 +22,11 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
 import org.apache.qpid.server.protocol.v0_8.state.AMQState;
 import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
@@ -49,8 +52,29 @@ public class ConnectionTuneOkMethodHandl
             _logger.debug(body);
         }
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+
         session.initHeartbeats(body.getHeartbeat());
-        session.setMaxFrameSize(body.getFrameMax());
+
+        long brokerFrameMax = stateManager.getBroker().getContextValue(Long.class,Broker.BROKER_FRAME_SIZE);
+        if(brokerFrameMax != 0 && body.getFrameMax() > brokerFrameMax)
+        {
+            throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+                                             "Attempt to set max frame size to " + body.getFrameMax()
+                                             + "greater than the broker will allow: "
+                                             + brokerFrameMax,
+                                             body.getClazz(), body.getMethod(),
+                                             body.getMajor(), body.getMinor(),null);
+        }
+        else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+        {
+            throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+                                             "Attempt to set max frame size to " + body.getFrameMax()
+                                             + "which is smaller than the specification definined minimum: "
+                                             + AMQConstant.FRAME_MIN_SIZE.getCode(),
+                                             body.getClazz(), body.getMethod(),
+                                             body.getMajor(), body.getMinor(),null);
+        }
+        session.setMaxFrameSize(body.getFrameMax()== 0l ? (brokerFrameMax == 0l ? 0xFFFFFFFFl : brokerFrameMax) : body.getFrameMax());
 
         long maxChannelNumber = body.getChannelMax();
         //0 means no implied limit, except that forced by protocol limitations (0xFFFF)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java Sat Aug  9 16:05:05 2014
@@ -51,12 +51,12 @@ public class AMQStateManager implements 
 {
     private static final Logger _logger = Logger.getLogger(AMQStateManager.class);
 
-    private final Broker _broker;
+    private final Broker<?> _broker;
     private final AMQProtocolSession _protocolSession;
     /** The current state */
     private AMQState _currentState;
 
-    public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
+    public AMQStateManager(Broker<?> broker, AMQProtocolSession protocolSession)
     {
         _broker = broker;
         _protocolSession = protocolSession;
@@ -69,7 +69,7 @@ public class AMQStateManager implements 
      *
      * @return the Broker
      */
-    public Broker getBroker()
+    public Broker<?> getBroker()
     {
         return _broker;
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Sat Aug  9 16:05:05 2014
@@ -72,6 +72,8 @@ public class ConnectionTuneMethodHandler
         ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
                                                                                     params.getFrameMax(),
                                                                                     params.getHeartbeat());
+
+        session.setMaxFrameSize(params.getFrameMax());
         // Be aware of possible changes to parameter order as versions change.
         session.writeFrame(tuneOkBody.generateFrame(channelId));
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sat Aug  9 16:05:05 2014
@@ -47,7 +47,7 @@ import org.apache.qpid.client.state.AMQS
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateWaiter;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.codec.AMQDecoder;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
@@ -165,7 +165,7 @@ public class AMQProtocolHandler implemen
 
     /** Object to lock on when changing the latch */
     private Object _failoverLatchChange = new Object();
-    private AMQCodecFactory _codecFactory;
+    private AMQDecoder _decoder;
 
     private ProtocolVersion _suggestedProtocolVersion;
 
@@ -189,7 +189,7 @@ public class AMQProtocolHandler implemen
         _connection = con;
         _protocolSession = new AMQProtocolSession(this, _connection);
         _stateManager = new AMQStateManager(_protocolSession);
-        _codecFactory = new AMQCodecFactory(false, _protocolSession);
+        _decoder = new AMQDecoder(false, _protocolSession);
         _failoverHandler = new FailoverHandler(this);
     }
 
@@ -460,7 +460,7 @@ public class AMQProtocolHandler implemen
         _lastReadTime = System.currentTimeMillis();
         try
         {
-            final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
+            final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
 
             // Decode buffer
             int size = dataBlocks.size();
@@ -944,4 +944,9 @@ public class AMQProtocolHandler implemen
     {
         _heartbeatListener.heartbeatReceived();
     }
+
+    public void setMaxFrameSize(final long frameMax)
+    {
+        _decoder.setMaxFrameSize(frameMax == 0l ? 0xffffffffl : frameMax);
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sat Aug  9 16:05:05 2014
@@ -20,8 +20,16 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
@@ -47,13 +55,6 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes.
  * <p>
@@ -543,4 +544,9 @@ public class AMQProtocolSession implemen
     {
         return _connectionStartServerProperties;
     }
+
+    public void setMaxFrameSize(final long frameMax)
+    {
+        _protocolHandler.setMaxFrameSize(frameMax);
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Sat Aug  9 16:05:05 2014
@@ -20,6 +20,16 @@
  */
 package org.apache.qpid.codec;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.ListIterator;
+
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
 import org.apache.qpid.framing.AMQFrameDecodingException;
@@ -31,16 +41,6 @@ import org.apache.qpid.framing.EncodingU
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ListIterator;
-
 /**
  * AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
  * protocol initiation decoder. It is a cumulative decoder, which means that it can accumulate data to decode in the
@@ -94,6 +94,11 @@ public class AMQDecoder
         _expectProtocolInitiation = expectProtocolInitiation;
     }
 
+    public void setMaxFrameSize(final long frameMax)
+    {
+        _dataBlockDecoder.setMaxFrameSize(frameMax);
+    }
+
     private class RemainingByteArrayInputStream extends InputStream
     {
         private int _currentListPos;

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Sat Aug  9 16:05:05 2014
@@ -20,12 +20,13 @@
  */
 package org.apache.qpid.framing;
 
+import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.codec.MarkableDataInput;
-
-import java.io.IOException;
+import org.apache.qpid.protocol.AMQConstant;
 
 public class AMQDataBlockDecoder
 {
@@ -40,6 +41,7 @@ public class AMQDataBlockDecoder
     }
 
     private Logger _logger = LoggerFactory.getLogger(AMQDataBlockDecoder.class);
+    private long _maxFrameSize = AMQConstant.FRAME_MIN_SIZE.getCode();
 
     public AMQDataBlockDecoder()
     { }
@@ -59,14 +61,17 @@ public class AMQDataBlockDecoder
 
         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
         final long bodySize = in.readInt() & 0xffffffffL;
-
+        if(bodySize > _maxFrameSize)
+        {
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Incoming frame size of "+bodySize+" is larger than negotiated maximum of  " + _maxFrameSize);
+        }
         in.reset();
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
+    public AMQFrame createAndPopulateFrame(BodyFactory methodBodyFactory, MarkableDataInput in)
             throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
         final byte type = in.readByte();
@@ -83,7 +88,7 @@ public class AMQDataBlockDecoder
 
         if (bodyFactory == null)
         {
-            throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Unsupported frame type: " + type);
         }
 
         final int channel = in.readUnsignedShort();
@@ -92,8 +97,8 @@ public class AMQDataBlockDecoder
         // bodySize can be zero
         if ((channel < 0) || (bodySize < 0))
         {
-            throw new AMQFrameDecodingException(null, "Undecodable frame: type = " + type + " channel = " + channel
-                + " bodySize = " + bodySize, null);
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "Undecodable frame: type = " + type + " channel = " + channel
+                + " bodySize = " + bodySize);
         }
 
         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
@@ -101,11 +106,15 @@ public class AMQDataBlockDecoder
         byte marker = in.readByte();
         if ((marker & 0xFF) != 0xCE)
         {
-            throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
-                + " type=" + type, null);
+            throw new AMQFrameDecodingException(AMQConstant.FRAME_ERROR, "End of frame marker not found. Read " + marker + " length=" + bodySize
+                + " type=" + type);
         }
 
         return frame;
     }
 
+    public void setMaxFrameSize(final long maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Sat Aug  9 16:05:05 2014
@@ -20,20 +20,21 @@
  */
 package org.apache.qpid.transport;
 
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.properties.ConnectionStartProperties;
-import org.apache.qpid.transport.util.Logger;
-
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.RESUMING;
 
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.common.QpidProperties;
+import org.apache.qpid.configuration.ClientProperties;
+import org.apache.qpid.properties.ConnectionStartProperties;
+import org.apache.qpid.transport.util.Logger;
+
 
 /**
  * ClientDelegate
@@ -138,13 +139,24 @@ public class ClientDelegate extends Conn
         int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
                                                            tune.getHeartbeatMin(),
                                                            tune.getHeartbeatMax());
+        int maxFrameSize = tune.getMaxFrameSize();
+        int settingsMaxFrameSize = conn.getConnectionSettings().getMaxFrameSize();
+        if(maxFrameSize == 0 && settingsMaxFrameSize != 0 && settingsMaxFrameSize < 0xffff)
+        {
+            maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, settingsMaxFrameSize);
+        }
+        else if(maxFrameSize != 0 && settingsMaxFrameSize != 0)
+        {
+            maxFrameSize = Math.max(Constant.MIN_MAX_FRAME_SIZE, Math.min(maxFrameSize, settingsMaxFrameSize));
+        }
         conn.connectionTuneOk(tune.getChannelMax(),
-                              tune.getMaxFrameSize(),
+                              maxFrameSize,
                               actualHeartbeatInterval);
 
         int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
         conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
         conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+        conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
         conn.setIdleTimeout(idleTimeout);
 
         int channelMax = tune.getChannelMax();
@@ -183,7 +195,7 @@ public class ClientDelegate extends Conn
     /**
      * Currently the spec specified the min and max for heartbeat using secs
      */
-    private int calculateHeartbeatInterval(int heartbeat,int min, int max)
+    int calculateHeartbeatInterval(int heartbeat,int min, int max)
     {
         int i = heartbeat;
         if (i == 0)

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Sat Aug  9 16:05:05 2014
@@ -20,23 +20,12 @@
  */
 package org.apache.qpid.transport;
 
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.*;
-import org.apache.qpid.transport.network.security.SecurityLayer;
-import org.apache.qpid.transport.network.security.SecurityLayerFactory;
-import org.apache.qpid.transport.util.Logger;
-import org.apache.qpid.transport.util.Waiter;
-import org.apache.qpid.util.Strings;
-
 import static org.apache.qpid.transport.Connection.State.CLOSED;
 import static org.apache.qpid.transport.Connection.State.CLOSING;
 import static org.apache.qpid.transport.Connection.State.NEW;
 import static org.apache.qpid.transport.Connection.State.OPEN;
 import static org.apache.qpid.transport.Connection.State.OPENING;
 
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslServer;
-
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -48,6 +37,23 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslServer;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.InputHandler;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
+import org.apache.qpid.util.Strings;
+
 
 /**
  * Connection
@@ -71,7 +77,7 @@ public class Connection extends Connecti
     private long _lastSendTime;
     private long _lastReadTime;
     private NetworkConnection _networkConnection;
-
+    private FrameSizeObserver _frameSizeObserver;
 
     public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
 
@@ -224,7 +230,9 @@ public class Connection extends Connecti
             securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
 
             OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
-            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+            final InputHandler inputHandler = new InputHandler(new Assembler(this));
+            addFrameSizeObserver(inputHandler);
+            Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(inputHandler);
             if(secureReceiver instanceof ConnectionListener)
             {
                 addConnectionListener((ConnectionListener)secureReceiver);
@@ -241,7 +249,9 @@ public class Connection extends Connecti
             {
                 addConnectionListener((ConnectionListener)secureSender);
             }
-            sender = new Disassembler(secureSender, settings.getMaxFrameSize());
+            Disassembler disassembler = new Disassembler(secureSender, Constant.MIN_MAX_FRAME_SIZE);
+            sender = disassembler;
+            addFrameSizeObserver(disassembler);
 
             send(new ProtocolHeader(1, 0, 10));
 
@@ -809,4 +819,33 @@ public class Connection extends Connecti
     {
         return _networkConnection;
     }
+
+    public void setMaxFrameSize(final int maxFrameSize)
+    {
+        if(_frameSizeObserver != null)
+        {
+            _frameSizeObserver.setMaxFrameSize(maxFrameSize);
+        }
+    }
+
+    public void addFrameSizeObserver(final FrameSizeObserver frameSizeObserver)
+    {
+        if(_frameSizeObserver == null)
+        {
+            _frameSizeObserver = frameSizeObserver;
+        }
+        else
+        {
+            final FrameSizeObserver currentObserver = _frameSizeObserver;
+            _frameSizeObserver = new FrameSizeObserver()
+                                    {
+                                        @Override
+                                        public void setMaxFrameSize(final int frameSize)
+                                        {
+                                            currentObserver.setMaxFrameSize(frameSize);
+                                            frameSizeObserver.setMaxFrameSize(frameSize);
+                                        }
+                                    };
+        }
+    }
 }

Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java?rev=1616977&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/FrameSizeObserver.java Sat Aug  9 16:05:05 2014
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+public interface FrameSizeObserver
+{
+    void setMaxFrameSize(int frameSize);
+}

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Sat Aug  9 16:05:05 2014
@@ -20,18 +20,19 @@
  */
 package org.apache.qpid.transport;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.qpid.transport.Connection.State.OPEN;
 
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * ServerDelegate
  */
@@ -136,12 +137,14 @@ public class ServerDelegate extends Conn
 
     protected void tuneAuthorizedConnection(final Connection conn)
     {
-        conn.connectionTune
-            (getChannelMax(),
-             org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
-             0, getHeartbeatMax());
+        conn.connectionTune(getChannelMax(), getFrameMax(), 0, getHeartbeatMax());
     }
-    
+
+    protected int getFrameMax()
+    {
+        return org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE;
+    }
+
     protected void secure(final Connection conn, final byte[] response)
     {
         final SaslServer ss = conn.getSaslServer();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java Sat Aug  9 16:05:05 2014
@@ -20,17 +20,18 @@
  */
 package org.apache.qpid.transport.network;
 
+import java.nio.ByteBuffer;
+
 import org.apache.qpid.transport.Binding;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.ConnectionListener;
+import org.apache.qpid.transport.Constant;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.network.security.sasl.SASLReceiver;
 import org.apache.qpid.transport.network.security.sasl.SASLSender;
 
-import java.nio.ByteBuffer;
-
 /**
  * ConnectionBinding
  *
@@ -80,23 +81,26 @@ public abstract class ConnectionBinding
         }
         
         // XXX: hardcoded max-frame
-        Disassembler dis = new Disassembler(sender, MAX_FRAME_SIZE);
+        Disassembler dis = new Disassembler(sender, Constant.MIN_MAX_FRAME_SIZE);
+        conn.addFrameSizeObserver(dis);
         conn.setSender(dis);
         return conn;
     }
 
     public Receiver<ByteBuffer> receiver(Connection conn)
     {
-        if (conn.getConnectionSettings() != null && 
+        final InputHandler inputHandler = new InputHandler(new Assembler(conn));
+        conn.addFrameSizeObserver(inputHandler);
+        if (conn.getConnectionSettings() != null &&
             conn.getConnectionSettings().isUseSASLEncryption())
         {
-            SASLReceiver receiver = new SASLReceiver(new InputHandler(new Assembler(conn)));
-            conn.addConnectionListener((ConnectionListener)receiver);
+            SASLReceiver receiver = new SASLReceiver(inputHandler);
+            conn.addConnectionListener(receiver);
             return receiver;
         }
         else
         {
-            return new InputHandler(new Assembler(conn));
+            return inputHandler;
         }
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Sat Aug  9 16:05:05 2014
@@ -20,6 +20,17 @@
  */
 package org.apache.qpid.transport.network;
 
+import static java.lang.Math.min;
+import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
+import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
+import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
+import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
+import static org.apache.qpid.transport.network.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.qpid.transport.FrameSizeObserver;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolDelegate;
@@ -31,24 +42,13 @@ import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
 
-import static org.apache.qpid.transport.network.Frame.FIRST_FRAME;
-import static org.apache.qpid.transport.network.Frame.FIRST_SEG;
-import static org.apache.qpid.transport.network.Frame.HEADER_SIZE;
-import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
-import static org.apache.qpid.transport.network.Frame.LAST_SEG;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static java.lang.Math.min;
-
 /**
  * Disassembler
  */
-public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>, ProtocolDelegate<Void>, FrameSizeObserver
 {
     private final Sender<ByteBuffer> sender;
-    private final int maxPayload;
+    private int maxPayload;
     private final Object sendlock = new Object();
     private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
     {
@@ -60,11 +60,11 @@ public final class Disassembler implemen
 
     public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
     {
+        this.sender = sender;
         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
         {
             throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
         }
-        this.sender = sender;
         this.maxPayload  = maxFrame - HEADER_SIZE;
     }
 
@@ -255,4 +255,15 @@ public final class Disassembler implemen
     {
         sender.setIdleTimeout(i);
     }
+
+    @Override
+    public void setMaxFrameSize(final int maxFrame)
+    {
+        if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+        {
+            throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+        }
+        this.maxPayload  = maxFrame - HEADER_SIZE;
+
+    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Frame.java Sat Aug  9 16:05:05 2014
@@ -20,12 +20,12 @@
  */
 package org.apache.qpid.transport.network;
 
-import org.apache.qpid.transport.SegmentType;
-
 import static org.apache.qpid.transport.util.Functions.str;
 
 import java.nio.ByteBuffer;
 
+import org.apache.qpid.transport.SegmentType;
+
 
 /**
  * Frame

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java Sat Aug  9 16:05:05 2014
@@ -20,11 +20,6 @@
  */
 package org.apache.qpid.transport.network;
 
-import org.apache.qpid.transport.ProtocolError;
-import org.apache.qpid.transport.ProtocolHeader;
-import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.SegmentType;
-
 import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
 import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
 import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
@@ -34,6 +29,13 @@ import static org.apache.qpid.transport.
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.FrameSizeObserver;
+import org.apache.qpid.transport.ProtocolError;
+import org.apache.qpid.transport.ProtocolHeader;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.SegmentType;
+
 
 /**
  * InputHandler
@@ -41,15 +43,17 @@ import java.nio.ByteOrder;
  * @author Rafael H. Schloming
  */
 
-public class InputHandler implements Receiver<ByteBuffer>
+public class InputHandler implements Receiver<ByteBuffer>, FrameSizeObserver
 {
 
+    private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
+
     public enum State
     {
         PROTO_HDR,
         FRAME_HDR,
         FRAME_BODY,
-        ERROR;
+        ERROR
     }
 
     private final Receiver<NetworkEvent> receiver;
@@ -83,6 +87,11 @@ public class InputHandler implements Rec
         this(receiver, PROTO_HDR);
     }
 
+    public void setMaxFrameSize(final int maxFrameSize)
+    {
+        _maxFrameSize = maxFrameSize;
+    }
+
     private void error(String fmt, Object ... args)
     {
         receiver.received(new ProtocolError(Frame.L1, fmt, args));
@@ -158,7 +167,8 @@ public class InputHandler implements Rec
             type = SegmentType.get(input.get(pos + 1));
             int size = (0xFFFF & input.getShort(pos + 2));
             size -= Frame.HEADER_SIZE;
-            if (size < 0 || size > (64*1024 - 12))
+            _maxFrameSize = 64 * 1024;
+            if (size < 0 || size > (_maxFrameSize - 12))
             {
                 error("bad frame size: %d", size);
                 return ERROR;

Modified: qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java (original)
+++ qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java Sat Aug  9 16:05:05 2014
@@ -21,6 +21,12 @@
 package org.apache.qpid.codec;
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
 import junit.framework.TestCase;
 
 import org.apache.qpid.framing.AMQDataBlock;
@@ -29,23 +35,15 @@ import org.apache.qpid.framing.AMQFrameD
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.HeartbeatBody;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-
 public class AMQDecoderTest extends TestCase
 {
 
-    private AMQCodecFactory _factory;
     private AMQDecoder _decoder;
 
 
     public void setUp()
     {
-        _factory = new AMQCodecFactory(false, null);
-        _decoder = _factory.getDecoder();
+        _decoder = new AMQDecoder(false, null);
     }
    
     

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java?rev=1616977&r1=1616976&r2=1616977&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java Sat Aug  9 16:05:05 2014
@@ -20,6 +20,8 @@
 */
 package org.apache.qpid.server.protocol;
 
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.net.InetSocketAddress;
@@ -32,6 +34,7 @@ import java.util.Set;
 
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -52,6 +55,7 @@ public class MultiVersionProtocolEngineF
         _broker = BrokerTestHelper.createBrokerMock();
         when(_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)).thenReturn("default");
         when(_broker.getDefaultVirtualHost()).thenReturn("default");
+        when(_broker.getContextValue(eq(Long.class), eq(Broker.BROKER_FRAME_SIZE))).thenReturn(0xffffl);
 
     }
 
@@ -149,8 +153,10 @@ public class MultiVersionProtocolEngineF
     {
         Set<Protocol> protocols = getAllAMQPProtocols();
 
+        Port<?> port = mock(Port.class);
+        when(port.getContextValue(eq(Long.class),eq(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))).thenReturn(10000l);
         MultiVersionProtocolEngineFactory factory =
-            new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, null,
+            new MultiVersionProtocolEngineFactory(_broker, null, false, false, protocols, null, port,
                     org.apache.qpid.server.model.Transport.TCP);
 
         //create a dummy to retrieve the 'current' ID number

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1616977&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/transport/MaxFrameSizeTest.java Sat Aug  9 16:05:05 2014
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.naming.NamingException;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.codec.MarkableDataInput;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQDataBlockDecoder;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BodyFactory;
+import org.apache.qpid.framing.ByteArrayDataInput;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.amqp_0_91.ConnectionStartOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.ConnectionTuneOkBodyImpl;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_8;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9;
+import org.apache.qpid.server.protocol.v0_8.ProtocolEngineCreator_0_9_1;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+
+public class MaxFrameSizeTest extends QpidBrokerTestCase
+{
+
+    @Override
+    public void setUp() throws Exception
+    {
+        // don't call super.setup() as we want a change to set stuff up before the broker starts
+        // super.setUp();
+    }
+
+    public void testTooSmallFrameSize() throws Exception
+    {
+
+        getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+                                                    TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+                                                    "secureOnlyMechanisms",
+                                                    "[]");
+        super.setUp();
+
+        if(isBroker010())
+        {
+            Connection conn = new Connection();
+            final ConnectionSettings settings = new ConnectionSettings();
+            BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+            settings.setHost(brokerDetails.getHost());
+            settings.setPort(brokerDetails.getPort());
+            settings.setUsername(GUEST_USERNAME);
+            settings.setPassword(GUEST_PASSWORD);
+            final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 1024);
+            conn.setConnectionDelegate(clientDelegate);
+            try
+            {
+                conn.connect(settings);
+                fail("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE);
+            }
+            catch(ConnectionException e)
+            {
+                // pass
+            }
+
+        }
+        else
+        {
+            doAMQP08test(1024, new ResultEvaluator()
+                                {
+
+                                    @Override
+                                    public void evaluate(final Socket socket, final List<AMQFrame> frames)
+                                    {
+                                        if(!socket.isClosed())
+                                        {
+                                            AMQFrame lastFrame = frames.get(frames.size() - 1);
+                                            assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+                                        }
+                                    }
+                                });
+        }
+    }
+
+
+    public void testTooLargeFrameSize() throws Exception
+    {
+        getBrokerConfiguration().setObjectAttribute(AuthenticationProvider.class,
+                                                    TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER,
+                                                    "secureOnlyMechanisms",
+                                                    "[]");
+        setTestSystemProperty(Broker.BROKER_FRAME_SIZE, "8192");
+        super.setUp();
+        if(isBroker010())
+        {
+            Connection conn = new Connection();
+            final ConnectionSettings settings = new ConnectionSettings();
+            BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+            settings.setHost(brokerDetails.getHost());
+            settings.setPort(brokerDetails.getPort());
+            settings.setUsername(GUEST_USERNAME);
+            settings.setPassword(GUEST_PASSWORD);
+            final ConnectionDelegate clientDelegate = new TestClientDelegate(settings, 0xffff);
+            conn.setConnectionDelegate(clientDelegate);
+            try
+            {
+                conn.connect(settings);
+                fail("Connection should not be possible with a frame size larger than the broker requested");
+            }
+            catch(ConnectionException e)
+            {
+                // pass
+            }
+
+        }
+        else
+        {
+            doAMQP08test(8192, new ResultEvaluator()
+                                {
+
+                                    @Override
+                                    public void evaluate(final Socket socket, final List<AMQFrame> frames)
+                                    {
+                                        if(!socket.isClosed())
+                                        {
+                                            AMQFrame lastFrame = frames.get(frames.size() - 1);
+                                            assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+                                        }
+                                    }
+                                });
+        }
+    }
+
+    private static interface ResultEvaluator
+    {
+        void evaluate(Socket socket, List<AMQFrame> frames);
+    }
+
+    private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
+            throws NamingException, IOException, AMQFrameDecodingException, AMQProtocolVersionException
+    {
+        BrokerDetails brokerDetails = getConnectionFactory().getConnectionURL().getAllBrokerDetails().get(0);
+
+        Socket socket = new Socket(brokerDetails.getHost(), brokerDetails.getPort());
+        socket.setTcpNoDelay(true);
+        OutputStream os = socket.getOutputStream();
+
+        byte[] protocolHeader;
+        Protocol protocol = getBrokerProtocol();
+        switch(protocol)
+        {
+            case AMQP_0_8:
+                protocolHeader = (ProtocolEngineCreator_0_8.getInstance().getHeaderIdentifier());
+                break;
+            case AMQP_0_9:
+                protocolHeader = (ProtocolEngineCreator_0_9.getInstance().getHeaderIdentifier());
+                break;
+            case AMQP_0_9_1:
+                protocolHeader = (ProtocolEngineCreator_0_9_1.getInstance().getHeaderIdentifier());
+                break;
+            default:
+                throw new RuntimeException("Unexpected Protocol Version: " + protocol);
+        }
+        os.write(protocolHeader);
+        InputStream is = socket.getInputStream();
+
+        final byte[] response = new byte[2+GUEST_USERNAME.length()+GUEST_PASSWORD.length()];
+        int i = 1;
+        for(byte b : GUEST_USERNAME.getBytes(StandardCharsets.US_ASCII))
+        {
+            response[i++] = b;
+        }
+        i++;
+        for(byte b : GUEST_PASSWORD.getBytes(StandardCharsets.US_ASCII))
+        {
+            response[i++] = b;
+        }
+
+        ConnectionStartOkBody startOK = new ConnectionStartOkBodyImpl(new FieldTable(), AMQShortString.valueOf("PLAIN"), response, AMQShortString.valueOf("en_US"));
+
+        DataOutputStream dos = new DataOutputStream(os);
+        new AMQFrame(0, startOK).writePayload(dos);
+        dos.flush();
+        ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBodyImpl(256, frameSize, 0);
+        new AMQFrame(0, tuneOk).writePayload(dos);
+        dos.flush();
+        socket.setSoTimeout(5000);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int size;
+        while((size = is.read(buffer)) > 0)
+        {
+            baos.write(buffer,0,size);
+        }
+
+        byte[] serverData = baos.toByteArray();
+        ByteArrayDataInput badi = new ByteArrayDataInput(serverData);
+        AMQDataBlockDecoder datablockDecoder = new AMQDataBlockDecoder();
+        final MethodRegistry_0_91 methodRegistry_0_91 = new MethodRegistry_0_91();
+        BodyFactory methodBodyFactory = new BodyFactory()
+        {
+            @Override
+            public AMQBody createBody(final MarkableDataInput in, final long bodySize)
+                    throws AMQFrameDecodingException, IOException
+            {
+                return methodRegistry_0_91.convertToBody(in, bodySize);
+            }
+        };
+
+        List<AMQFrame> frames = new ArrayList<>();
+        while (datablockDecoder.decodable(badi))
+        {
+            frames.add(datablockDecoder.createAndPopulateFrame(methodBodyFactory, badi));
+        }
+
+        evaluator.evaluate(socket, frames);
+    }
+
+    private static class TestClientDelegate extends ClientDelegate
+    {
+
+        private final int _maxFrameSize;
+
+        public TestClientDelegate(final ConnectionSettings settings, final int maxFrameSize)
+        {
+            super(settings);
+            _maxFrameSize = maxFrameSize;
+        }
+
+        @Override
+        protected SaslClient createSaslClient(final List<Object> brokerMechs) throws ConnectionException, SaslException
+        {
+            final CallbackHandler handler = new CallbackHandler()
+            {
+                @Override
+                public void handle(final Callback[] callbacks) throws IOException, UnsupportedCallbackException
+                {
+                    for (int i = 0; i < callbacks.length; i++)
+                    {
+                        Callback cb = callbacks[i];
+                        if (cb instanceof NameCallback)
+                        {
+                            ((NameCallback)cb).setName(GUEST_USERNAME);
+                        }
+                        else if (cb instanceof PasswordCallback)
+                        {
+                            ((PasswordCallback)cb).setPassword(GUEST_PASSWORD.toCharArray());
+                        }
+                        else
+                        {
+                            throw new UnsupportedCallbackException(cb);
+                        }
+                    }
+
+                }
+            };
+            String[] selectedMechs = {};
+            for(String mech : new String[] { "ANONYMOUS", "PLAIN", "CRAM-MD5", "SCRAM-SHA-1", "SCRAM-SHA-256"})
+            {
+                if(brokerMechs.contains(mech))
+                {
+                    selectedMechs = new String[] {mech};
+                    break;
+                }
+            }
+
+
+            return Sasl.createSaslClient(selectedMechs,
+                                         null,
+                                         getConnectionSettings().getSaslProtocol(),
+                                         getConnectionSettings().getSaslServerName(),
+                                         Collections.<String,Object>emptyMap(),
+                                         handler);
+
+        }
+
+        @Override
+        public void connectionTune(Connection conn, ConnectionTune tune)
+        {
+            int heartbeatInterval = getConnectionSettings().getHeartbeatInterval010();
+            float heartbeatTimeoutFactor = getConnectionSettings().getHeartbeatTimeoutFactor();
+            int actualHeartbeatInterval = calculateHeartbeatInterval(heartbeatInterval,
+                                                                     tune.getHeartbeatMin(),
+                                                                     tune.getHeartbeatMax());
+
+            conn.connectionTuneOk(tune.getChannelMax(),
+                                  _maxFrameSize,
+                                  actualHeartbeatInterval);
+
+            int idleTimeout = (int)(actualHeartbeatInterval * 1000 * heartbeatTimeoutFactor);
+            conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
+            conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+            conn.setMaxFrameSize(_maxFrameSize);
+
+
+            conn.setIdleTimeout(idleTimeout);
+
+            int channelMax = tune.getChannelMax();
+            conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
+
+            conn.connectionOpen(getConnectionSettings().getVhost(), null, Option.INSIST);
+        }
+
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org