You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 16:23:22 UTC

svn commit: r1632583 [3/5] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/ bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/...

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java Fri Oct 17 14:23:19 2014
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.store.StoredMessage;

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Oct 17 14:23:19 2014
@@ -21,9 +21,13 @@
 package org.apache.qpid.server.protocol.v0_8;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.security.AccessControlException;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
@@ -41,11 +45,11 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.security.auth.Subject;
+import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQDecoder;
@@ -54,7 +58,6 @@ import org.apache.qpid.common.ServerProp
 import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -67,15 +70,15 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.SessionModelListener;
-import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverterRegistry;
-import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -86,7 +89,9 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
 
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine,
+                                          AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
+                                          ServerMethodProcessor<ServerChannelMethodProcessor>
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -94,6 +99,8 @@ public class AMQProtocolEngine implement
     // channels.  This value must be of the form 2^x - 1.
     private static final int CHANNEL_CACHE_SIZE = 0xff;
     private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+    public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength";
+    public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
     private final Port<?> _port;
     private final long _creationTime;
 
@@ -105,13 +112,12 @@ public class AMQProtocolEngine implement
 
     private VirtualHostImpl<?,?,?> _virtualHost;
 
-    private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
-            new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+    private final Map<Integer, AMQChannel> _channelMap =
+            new HashMap<>();
     private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
-            new CopyOnWriteArrayList<SessionModelListener>();
+            new CopyOnWriteArrayList<>();
 
-    @SuppressWarnings("unchecked")
-    private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
     /**
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -120,10 +126,8 @@ public class AMQProtocolEngine implement
      *
      * Thread-safety: guarded by {@link #_receivedLock}.
      */
-    private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
-            new HashSet<AMQChannel<AMQProtocolEngine>>();
-
-    private final AMQStateManager _stateManager;
+    private final Set<AMQChannel> _channelsForCurrentMessage =
+            new HashSet<>();
 
     private AMQDecoder _decoder;
 
@@ -136,14 +140,13 @@ public class AMQProtocolEngine implement
 
     /* AMQP Version for this session */
     private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
-    private MethodRegistry _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
+    private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion);
     private final List<Action<? super AMQProtocolEngine>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQProtocolEngine>>();
+            new CopyOnWriteArrayList<>();
 
-    private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<Integer, Long>();
+    private Map<Integer, Long> _closingChannelsList = new ConcurrentHashMap<>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private final Subject _authorizedSubject = new Subject();
-    private MethodDispatcher _dispatcher;
 
     private final long _connectionID;
     private Object _reference = new Object();
@@ -177,6 +180,9 @@ public class AMQProtocolEngine implement
     private boolean _authenticated;
     private boolean _compressionSupported;
     private int _messageCompressionThreshold;
+    private int _currentClassId;
+    private int _currentMethodId;
+    private int _binaryDataLimit;
 
     public AMQProtocolEngine(Broker broker,
                              final NetworkConnection network,
@@ -189,11 +195,12 @@ public class AMQProtocolEngine implement
         _transport = transport;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _receivedLock = new ReentrantLock();
-        _stateManager = new AMQStateManager(broker, this);
-        _decoder = new AMQDecoder(true, this);
+        _decoder = new BrokerDecoder(this);
         _connectionID = connectionId;
         _logSubject = new ConnectionLogSubject(this);
-
+        _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH)
+                ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH)
+                : DEFAULT_DEBUG_BINARY_DATA_LENGTH;
         _authorizedSubject.getPrincipals().add(new ConnectionPrincipal(this));
         runAsSubject(new PrivilegedAction<Void>()
         {
@@ -303,28 +310,7 @@ public class AMQProtocolEngine implement
                 _receivedLock.lock();
                 try
                 {
-                    final ArrayList<AMQDataBlock> dataBlocks = _decoder.decodeBuffer(msg);
-                    for (AMQDataBlock dataBlock : dataBlocks)
-                    {
-                        try
-                        {
-                            dataBlockReceived(dataBlock);
-                        }
-                        catch(AMQConnectionException e)
-                        {
-                            if(_logger.isDebugEnabled())
-                            {
-                                _logger.debug("Caught AMQConnectionException but will simply stop processing data blocks - the connection should already be closed.", e);
-                            }
-                            break;
-                        }
-                        catch (AMQException e)
-                        {
-                            _logger.error("Unexpected exception when processing datablock", e);
-                            closeProtocolSession();
-                            break;
-                        }
-                    }
+                    _decoder.decodeBuffer(msg);
                     receivedComplete();
                 }
                 catch (ConnectionScopedRuntimeException e)
@@ -366,7 +352,7 @@ public class AMQProtocolEngine implement
     {
         RuntimeException exception = null;
 
-        for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
+        for (AMQChannel channel : _channelsForCurrentMessage)
         {
             try
             {
@@ -391,112 +377,10 @@ public class AMQProtocolEngine implement
         }
     }
 
-    /**
-     * Process the data block.
-     * If the message is for a channel it is added to {@link #_channelsForCurrentMessage}.
-     *
-     * @throws AMQConnectionException if unable to process the data block. In this case,
-     * the connection is already closed by the time the exception is thrown. If any other
-     * type of exception is thrown, the connection is not already closed.
-     */
-    private void dataBlockReceived(AMQDataBlock message) throws AMQException
-    {
-        if (message instanceof ProtocolInitiation)
-        {
-            protocolInitiationReceived((ProtocolInitiation) message);
-
-        }
-        else if (message instanceof AMQFrame)
-        {
-            AMQFrame frame = (AMQFrame) message;
-            frameReceived(frame);
-
-        }
-        else
-        {
-            throw new AMQException("Unknown message type: " + message.getClass().getName() + ": " + message);
-        }
-    }
 
-    /**
-     * Handle the supplied frame.
-     * Adds this frame's channel to {@link #_channelsForCurrentMessage}.
-     *
-     * @throws AMQConnectionException if unable to process the data block. In this case,
-     * the connection is already closed by the time the exception is thrown. If any other
-     * type of exception is thrown, the connection is not already closed.
-     */
-    private void frameReceived(AMQFrame frame) throws AMQException
+    void channelRequiresSync(final AMQChannel amqChannel)
     {
-        int channelId = frame.getChannel();
-        AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
-        if(amqChannel != null)
-        {
-            // The _receivedLock is already acquired in the caller
-            // It is safe to add channel
-            _channelsForCurrentMessage.add(amqChannel);
-        }
-        else
-        {
-            // Not an error. The frame is probably a channel Open for this channel id, which
-            // does not require asynchronous work therefore its absence from
-            // _channelsForCurrentMessage is ok.
-        }
-
-        AMQBody body = frame.getBodyFrame();
-
-        long startTime = 0;
-        String frameToString = null;
-        if (_logger.isDebugEnabled())
-        {
-            startTime = System.currentTimeMillis();
-            frameToString = frame.toString();
-            _logger.debug("RECV: " + frame);
-        }
-
-        // Check that this channel is not closing
-        if (channelAwaitingClosure(channelId))
-        {
-            if ((frame.getBodyFrame() instanceof ChannelCloseOkBody))
-            {
-                if (_logger.isInfoEnabled())
-                {
-                    _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok");
-                }
-            }
-            else
-            {
-                // The channel has been told to close, we don't process any more frames until
-                // it's closed.
-                return;
-            }
-        }
-
-        try
-        {
-            body.handle(channelId, this);
-        }
-        catch(AMQConnectionException e)
-        {
-            _logger.info(e.getMessage() + " whilst processing frame: " + body);
-            closeConnection(channelId, e);
-            throw e;
-        }
-        catch (AMQException e)
-        {
-            closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
-            throw e;
-        }
-        catch (TransportException e)
-        {
-            closeChannel(channelId, AMQConstant.CHANNEL_ERROR, e.getMessage());
-            throw e;
-        }
-
-        if(_logger.isDebugEnabled())
-        {
-            _logger.debug("Frame handled in " + (System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
-        }
+        _channelsForCurrentMessage.add(amqChannel);
     }
 
     private synchronized void protocolInitiationReceived(ProtocolInitiation pi)
@@ -615,87 +499,6 @@ public class AMQProtocolEngine implement
         return buf;
     }
 
-    public void methodFrameReceived(int channelId, AMQMethodBody methodBody)
-    {
-        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
-
-        try
-        {
-            try
-            {
-                boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
-
-                if (!wasAnyoneInterested)
-                {
-                    throw new AMQNoMethodHandlerException(evt);
-                }
-            }
-            catch (AMQChannelException e)
-            {
-                if (getChannel(channelId) != null)
-                {
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Closing channel due to: " + e.getMessage());
-                    }
-
-                    writeFrame(e.getCloseFrame(channelId));
-                    closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
-                }
-                else
-                {
-                    if (_logger.isDebugEnabled())
-                    {
-                        _logger.debug("ChannelException occurred on non-existent channel:" + e.getMessage());
-                    }
-
-                    if (_logger.isInfoEnabled())
-                    {
-                        _logger.info("Closing connection due to: " + e.getMessage());
-                    }
-
-                    AMQConnectionException ce =
-                            evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                                                                   AMQConstant.CHANNEL_ERROR.getName().toString());
-
-                    _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                    closeConnection(channelId, ce);
-                }
-            }
-            catch (AMQConnectionException e)
-            {
-                _logger.info(e.getMessage() + " whilst processing:" + methodBody);
-                closeConnection(channelId, e);
-            }
-        }
-        catch (Exception e)
-        {
-            _logger.error("Unexpected exception while processing frame.  Closing connection.", e);
-
-            closeProtocolSession();
-        }
-    }
-
-    public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
-    {
-
-        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
-
-        channel.publishContentHeader(body);
-
-    }
-
-    public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
-    {
-        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
-
-        channel.publishContentBody(body);
-    }
-
-    public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
-    {
-        // NO - OP
-    }
 
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
@@ -735,28 +538,17 @@ public class AMQProtocolEngine implement
         _contextKey = contextKey;
     }
 
-    public List<AMQChannel<AMQProtocolEngine>> getChannels()
+    public List<AMQChannel> getChannels()
     {
         synchronized (_channelMap)
         {
-            return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
-        }
-    }
-
-    public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
-    {
-        AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
-        if (channel == null)
-        {
-            throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
+            return new ArrayList<>(_channelMap.values());
         }
-
-        return channel;
     }
 
-    public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
+    public AMQChannel getChannel(int channelId)
     {
-        final AMQChannel<AMQProtocolEngine> channel =
+        final AMQChannel channel =
                 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
@@ -773,38 +565,17 @@ public class AMQProtocolEngine implement
         return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
-    public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
+    public void addChannel(AMQChannel channel)
     {
-        if (_closed)
-        {
-            throw new AMQException("Session is closed");
-        }
-
         final int channelId = channel.getChannelId();
 
-        if (_closingChannelsList.containsKey(channelId))
-        {
-            throw new AMQException("Session is marked awaiting channel close");
-        }
-
-        if (_channelMap.size() == _maxNoOfChannels)
-        {
-            String errorMessage =
-                    toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
-                    + "); can't create channel";
-            _logger.error(errorMessage);
-            throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage);
-        }
-        else
+        synchronized (_channelMap)
         {
-            synchronized (_channelMap)
+            _channelMap.put(channel.getChannelId(), channel);
+            sessionAdded(channel);
+            if(_blocking)
             {
-                _channelMap.put(channel.getChannelId(), channel);
-                sessionAdded(channel);
-                if(_blocking)
-                {
-                    channel.block();
-                }
+                channel.block();
             }
         }
 
@@ -830,7 +601,7 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public Long getMaximumNumberOfChannels()
+    public long getMaximumNumberOfChannels()
     {
         return _maxNoOfChannels;
     }
@@ -840,52 +611,52 @@ public class AMQProtocolEngine implement
         _maxNoOfChannels = value;
     }
 
-    /**
-     * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
-     * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
-     *
-     * @param channelId id of the channel to close
-     *
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
-    @Override
-    public void closeChannel(int channelId)
+
+    void closeChannel(AMQChannel channel)
+    {
+        closeChannel(channel, null, null, false);
+    }
+
+    public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message)
     {
-        closeChannel(channelId, null, null);
+        writeFrame(new AMQFrame(channel.getChannelId(),
+                                getMethodRegistry().createChannelCloseBody(cause.getCode(),
+                                                                           AMQShortString.validValueOf(message),
+                                                                           _currentClassId,
+                                                                           _currentMethodId)));
+        closeChannel(channel, cause, message, true);
     }
 
     public void closeChannel(int channelId, AMQConstant cause, String message)
     {
-        final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+        final AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
         }
-        else
+        closeChannel(channel, cause, message, true);
+    }
+
+    void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark)
+    {
+        int channelId = channel.getChannelId();
+        try
         {
-            try
+            channel.close(cause, message);
+            if(mark)
             {
-                channel.close(cause, message);
                 markChannelAwaitingCloseOk(channelId);
             }
-            finally
-            {
-                removeChannel(channelId);
-            }
+        }
+        finally
+        {
+            removeChannel(channelId);
         }
     }
 
+
     public void closeChannelOk(int channelId)
     {
-        // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler.
-        // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory.
-        // We do it from the Close Handler as we are sending the OK back to the client.
-        // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
-        // will send a close-ok.. Where we should call removeChannel.
-        // However, due to the poor exception handling on the client. The client-user will be notified of the
-        // InvalidArgument and if they then decide to close the session/connection then the there will be time
-        // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed.
-
         _closingChannelsList.remove(channelId);
     }
 
@@ -901,7 +672,7 @@ public class AMQProtocolEngine implement
      */
     public void removeChannel(int channelId)
     {
-        AMQChannel<AMQProtocolEngine> session;
+        AMQChannel session;
         synchronized (_channelMap)
         {
             session = _channelMap.remove(channelId);
@@ -937,7 +708,7 @@ public class AMQProtocolEngine implement
      */
     private void closeAllChannels()
     {
-        for (AMQChannel<AMQProtocolEngine> channel : getChannels())
+        for (AMQChannel channel : getChannels())
         {
             channel.close();
         }
@@ -952,7 +723,6 @@ public class AMQProtocolEngine implement
     }
 
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
-    @Override
     public void closeSession()
     {
 
@@ -1042,13 +812,30 @@ public class AMQProtocolEngine implement
 
     private void closeConnection(int channelId, AMQConnectionException e)
     {
-        try
+
+        if (_logger.isInfoEnabled())
         {
-            if (_logger.isInfoEnabled())
-            {
-                _logger.info("Closing connection due to: " + e);
-            }
+            _logger.info("Closing connection due to: " + e);
+        }
+        closeConnection(channelId, e.getCloseFrame());
+    }
+
 
+    void closeConnection(AMQConstant errorCode,
+                         String message, int channelId)
+    {
+
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing connection due to: " + message);
+        }
+        closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _currentClassId, _currentMethodId)));
+    }
+
+    private void closeConnection(int channelId, AMQFrame frame)
+    {
+        try
+        {
             markChannelAwaitingCloseOk(channelId);
             closeSession();
         }
@@ -1056,8 +843,7 @@ public class AMQProtocolEngine implement
         {
             try
             {
-                _stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                writeFrame(e.getCloseFrame(channelId));
+                writeFrame(frame);
             }
             finally
             {
@@ -1068,23 +854,9 @@ public class AMQProtocolEngine implement
 
     }
 
-    @Override
     public void closeProtocolSession()
     {
         _network.close();
-
-        try
-        {
-            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
-        }
-        catch (ConnectionScopedRuntimeException e)
-        {
-            _logger.info(e.getMessage());
-        }
-        catch (TransportException e)
-        {
-            _logger.info(e.getMessage());
-        }
     }
 
     public String toString()
@@ -1174,9 +946,8 @@ public class AMQProtocolEngine implement
     private void setProtocolVersion(ProtocolVersion pv)
     {
         _protocolVersion = pv;
-        _methodRegistry = MethodRegistry.getMethodRegistry(_protocolVersion);
-        _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
-        _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
+        _methodRegistry.setProtocolVersion(_protocolVersion);
+        _protocolOutputConverter = new ProtocolOutputConverterImpl(this);
     }
 
     public byte getProtocolMajorVersion()
@@ -1204,7 +975,7 @@ public class AMQProtocolEngine implement
         return _virtualHost;
     }
 
-    public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost) throws AMQException
+    public void setVirtualHost(VirtualHostImpl<?,?,?> virtualHost)
     {
         _virtualHost = virtualHost;
 
@@ -1285,11 +1056,6 @@ public class AMQProtocolEngine implement
         return _methodRegistry;
     }
 
-    public MethodDispatcher getMethodDispatcher()
-    {
-        return _dispatcher;
-    }
-
     public void closed()
     {
         try
@@ -1303,14 +1069,10 @@ public class AMQProtocolEngine implement
                 closeProtocolSession();
             }
         }
-        catch (ConnectionScopedRuntimeException e)
+        catch (ConnectionScopedRuntimeException | TransportException e)
         {
             _logger.error("Could not close protocol engine", e);
         }
-        catch (TransportException e)
-        {
-           _logger.error("Could not close protocol engine", e);
-        }
     }
 
     public void readerIdle()
@@ -1351,9 +1113,11 @@ public class AMQProtocolEngine implement
             {
                 _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
 
-
-                MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
-                ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200, AMQShortString.validValueOf(throwable.getMessage()),0,0);
+                ConnectionCloseBody closeBody = _methodRegistry.createConnectionCloseBody(AMQConstant.INTERNAL_ERROR.getCode(),
+                                                                                             AMQShortString.validValueOf(
+                                                                                                     throwable.getMessage()),
+                                                                                             _currentClassId,
+                                                                                             _currentMethodId);
 
                 writeFrame(closeBody.generateFrame(0));
 
@@ -1374,11 +1138,6 @@ public class AMQProtocolEngine implement
         }
     }
 
-    public void setSender(Sender<ByteBuffer> sender)
-    {
-        // Do nothing
-    }
-
     public long getReadBytes()
     {
         return _readBytes;
@@ -1460,7 +1219,7 @@ public class AMQProtocolEngine implement
         return String.valueOf(getRemoteAddress());
     }
 
-    public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
+    public void closeSession(AMQChannel session, AMQConstant cause, String message)
     {
         int channelId = session.getChannelId();
         closeChannel(channelId, cause, message);
@@ -1470,7 +1229,7 @@ public class AMQProtocolEngine implement
                 methodRegistry.createChannelCloseBody(
                         cause.getCode(),
                         AMQShortString.validValueOf(message),
-                        0,0);
+                        0, 0);
 
         writeFrame(responseBody.generateFrame(channelId));
     }
@@ -1478,9 +1237,8 @@ public class AMQProtocolEngine implement
     public void close(AMQConstant cause, String message)
     {
         closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
-		                getProtocolOutputConverter().getProtocolMajorVersion(),
-		                getProtocolOutputConverter().getProtocolMinorVersion(),
-		                null));
+                                                      getMethodRegistry(),
+		                                              null));
     }
 
     public void block()
@@ -1490,7 +1248,7 @@ public class AMQProtocolEngine implement
             if(!_blocking)
             {
                 _blocking = true;
-                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+                for(AMQChannel channel : _channelMap.values())
                 {
                     channel.block();
                 }
@@ -1505,7 +1263,7 @@ public class AMQProtocolEngine implement
             if(_blocking)
             {
                 _blocking = false;
-                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+                for(AMQChannel channel : _channelMap.values())
                 {
                     channel.unblock();
                 }
@@ -1518,9 +1276,9 @@ public class AMQProtocolEngine implement
         return _closed;
     }
 
-    public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
+    public List<AMQChannel> getSessionModels()
     {
-		return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
+		return new ArrayList<>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -1609,6 +1367,382 @@ public class AMQProtocolEngine implement
         _deferFlush = deferFlush;
     }
 
+    @Override
+    public void receiveChannelOpen(final int channelId)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV[" + channelId + "] ChannelOpen");
+        }
+
+        // Protect the broker against out of order frame request.
+        if (_virtualHost == null)
+        {
+            closeConnection(AMQConstant.COMMAND_INVALID,
+                            "Virtualhost has not yet been set. ConnectionOpen has not been called.", channelId);
+        }
+        else if(getChannel(channelId) != null || channelAwaitingClosure(channelId))
+        {
+            closeConnection(AMQConstant.CHANNEL_ERROR, "Channel " + channelId + " already exists", channelId);
+        }
+        else if(channelId > getMaximumNumberOfChannels())
+        {
+            closeConnection(AMQConstant.CHANNEL_ERROR,
+                            "Channel " + channelId + " cannot be created as the max allowed channel id is "
+                            + getMaximumNumberOfChannels(),
+                            channelId);
+        }
+        else
+        {
+            _logger.info("Connecting to: " + _virtualHost.getName());
+
+            final AMQChannel channel = new AMQChannel(this, channelId, _virtualHost.getMessageStore());
+
+            addChannel(channel);
+
+            ChannelOpenOkBody response;
+
+
+            response = getMethodRegistry().createChannelOpenOkBody();
+
+
+            writeFrame(response.generateFrame(channelId));
+        }
+    }
+
+    @Override
+    public void receiveConnectionOpen(AMQShortString virtualHostName,
+                                      AMQShortString capabilities,
+                                      boolean insist)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionOpen[" +" virtualHost: " + virtualHostName + " capabilities: " + capabilities + " insist: " + insist + " ]");
+        }
+
+        String virtualHostStr;
+        if ((virtualHostName != null) && virtualHostName.charAt(0) == '/')
+        {
+            virtualHostStr = virtualHostName.toString().substring(1);
+        }
+        else
+        {
+            virtualHostStr = virtualHostName == null ? null : virtualHostName.toString();
+        }
+
+        VirtualHostImpl virtualHost = ((AmqpPort)getPort()).getVirtualHost(virtualHostStr);
+
+        if (virtualHost == null)
+        {
+            closeConnection(AMQConstant.NOT_FOUND,
+                            "Unknown virtual host: '" + virtualHostName + "'",0);
+
+        }
+        else
+        {
+            // Check virtualhost access
+            if (virtualHost.getState() != State.ACTIVE)
+            {
+                closeConnection(AMQConstant.CONNECTION_FORCED,
+                                "Virtual host '" + virtualHost.getName() + "' is not active",0);
+
+            }
+            else
+            {
+                setVirtualHost(virtualHost);
+                try
+                {
+                    virtualHost.getSecurityManager().authoriseCreateConnection(this);
+                    if (getContextKey() == null)
+                    {
+                        setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+                    }
+
+                    MethodRegistry methodRegistry = getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHostName);
+
+                    writeFrame(responseBody.generateFrame(0));
+                }
+                catch (AccessControlException e)
+                {
+                    closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(),0);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void receiveConnectionClose(final int replyCode,
+                                       final AMQShortString replyText,
+                                       final int classId,
+                                       final int methodId)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+        }
+
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("ConnectionClose received with reply code/reply text " + replyCode + "/" +
+                         replyText + " for " + this);
+        }
+        try
+        {
+            closeSession();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error closing protocol session: " + e, e);
+        }
+
+        MethodRegistry methodRegistry = getMethodRegistry();
+        ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+        writeFrame(responseBody.generateFrame(0));
+
+        closeProtocolSession();
+
+    }
+
+    @Override
+    public void receiveConnectionCloseOk()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionCloseOk");
+        }
+
+        _logger.info("Received Connection-close-ok");
+
+        try
+        {
+            closeSession();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error closing protocol session: " + e, e);
+        }
+    }
+
+    @Override
+    public void receiveConnectionSecureOk(final byte[] response)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionSecureOk[ response: ******** ] ");
+        }
+
+        Broker<?> broker = getBroker();
+
+        SubjectCreator subjectCreator = getSubjectCreator();
+
+        SaslServer ss = getSaslServer();
+        if (ss == null)
+        {
+            closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 );
+        }
+        MethodRegistry methodRegistry = getMethodRegistry();
+        SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+        switch (authResult.getStatus())
+        {
+            case ERROR:
+                Exception cause = authResult.getCause();
+
+                _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+                closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed",0);
+
+                disposeSaslServer();
+                break;
+            case SUCCESS:
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Connected as: " + authResult.getSubject());
+                }
+
+                int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+                if (frameMax <= 0)
+                {
+                    frameMax = Integer.MAX_VALUE;
+                }
+
+                ConnectionTuneBody tuneBody =
+                        methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+                                                                frameMax,
+                                                                broker.getConnection_heartBeatDelay());
+                writeFrame(tuneBody.generateFrame(0));
+                setAuthorizedSubject(authResult.getSubject());
+                disposeSaslServer();
+                break;
+            case CONTINUE:
+
+                ConnectionSecureBody
+                        secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                writeFrame(secureBody.generateFrame(0));
+        }
+    }
+
+
+    private void disposeSaslServer()
+    {
+        SaslServer ss = getSaslServer();
+        if (ss != null)
+        {
+            setSaslServer(null);
+            try
+            {
+                ss.dispose();
+            }
+            catch (SaslException e)
+            {
+                _logger.error("Error disposing of Sasl server: " + e);
+            }
+        }
+    }
+
+    @Override
+    public void receiveConnectionStartOk(final FieldTable clientProperties,
+                                         final AMQShortString mechanism,
+                                         final byte[] response,
+                                         final AMQShortString locale)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionStartOk["
+                          + " clientProperties: "
+                          + clientProperties
+                          + " mechanism: "
+                          + mechanism
+                          + " response: ********"
+                          + " locale: "
+                          + locale
+                          + " ]");
+        }
+
+        Broker<?> broker = getBroker();
+
+        _logger.info("SASL Mechanism selected: " + mechanism);
+        _logger.info("Locale selected: " + locale);
+
+        SubjectCreator subjectCreator = getSubjectCreator();
+        SaslServer ss = null;
+        try
+        {
+            ss = subjectCreator.createSaslServer(String.valueOf(mechanism),
+                                                 getLocalFQDN(),
+                                                 getPeerPrincipal());
+
+            if (ss == null)
+            {
+                closeConnection(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + mechanism, 0);
+
+            }
+            else
+            {
+                //save clientProperties
+                setClientProperties(clientProperties);
+
+                setSaslServer(ss);
+
+                final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response);
+
+                MethodRegistry methodRegistry = getMethodRegistry();
+
+                switch (authResult.getStatus())
+                {
+                    case ERROR:
+                        Exception cause = authResult.getCause();
+
+                        _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+                        closeConnection(AMQConstant.NOT_ALLOWED, "Authentication failed", 0);
+
+                        disposeSaslServer();
+                        break;
+
+                    case SUCCESS:
+                        if (_logger.isInfoEnabled())
+                        {
+                            _logger.info("Connected as: " + authResult.getSubject());
+                        }
+                        setAuthorizedSubject(authResult.getSubject());
+
+                        int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+                        if (frameMax <= 0)
+                        {
+                            frameMax = Integer.MAX_VALUE;
+                        }
+
+                        ConnectionTuneBody
+                                tuneBody =
+                                methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+                                                                        frameMax,
+                                                                        broker.getConnection_heartBeatDelay());
+                        writeFrame(tuneBody.generateFrame(0));
+                        break;
+                    case CONTINUE:
+                        ConnectionSecureBody
+                                secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+                        writeFrame(secureBody.generateFrame(0));
+                }
+            }
+        }
+        catch (SaslException e)
+        {
+            disposeSaslServer();
+            closeConnection(AMQConstant.INTERNAL_ERROR, "SASL error: " + e, 0);
+        }
+    }
+
+    @Override
+    public void receiveConnectionTuneOk(final int channelMax, final long frameMax, final int heartbeat)
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ConnectionTuneOk[" +" channelMax: " + channelMax + " frameMax: " + frameMax + " heartbeat: " + heartbeat + " ]");
+        }
+
+        initHeartbeats(heartbeat);
+
+        int brokerFrameMax = getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+        if (brokerFrameMax <= 0)
+        {
+            brokerFrameMax = Integer.MAX_VALUE;
+        }
+
+        if (frameMax > (long) brokerFrameMax)
+        {
+            closeConnection(AMQConstant.SYNTAX_ERROR,
+                            "Attempt to set max frame size to " + frameMax
+                            + " greater than the broker will allow: "
+                            + brokerFrameMax, 0);
+        }
+        else if (frameMax > 0 && frameMax < AMQConstant.FRAME_MIN_SIZE.getCode())
+        {
+            closeConnection(AMQConstant.SYNTAX_ERROR,
+                            "Attempt to set max frame size to " + frameMax
+                            + " which is smaller than the specification defined minimum: "
+                            + AMQConstant.FRAME_MIN_SIZE.getCode(), 0);
+        }
+        else
+        {
+            int calculatedFrameMax = frameMax == 0 ? brokerFrameMax : (int) frameMax;
+            setMaxFrameSize(calculatedFrameMax);
+
+            //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+            setMaximumNumberOfChannels( ((channelMax == 0l) || (channelMax > 0xFFFFL))
+                                               ? 0xFFFFL
+                                               : channelMax);
+        }
+    }
+
+    public int getBinaryDataLimit()
+    {
+        return _binaryDataLimit;
+    }
+
     public final class WriteDeliverMethod
             implements ClientDeliveryMethod
     {
@@ -1656,24 +1790,31 @@ public class AMQProtocolEngine implement
         return _lastWriteTime.get();
     }
 
-    @Override
     public boolean isCloseWhenNoRoute()
     {
         return _closeWhenNoRoute;
     }
 
-    @Override
     public boolean isCompressionSupported()
     {
         return _compressionSupported && _broker.isMessageCompressionEnabled();
     }
 
-    @Override
     public int getMessageCompressionThreshold()
     {
         return _messageCompressionThreshold;
     }
 
+    public Broker<?> getBroker()
+    {
+        return _broker;
+    }
+
+    public SubjectCreator getSubjectCreator()
+    {
+        return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());
+    }
+
     public EventLogger getEventLogger()
     {
         if(_virtualHost != null)
@@ -1685,4 +1826,72 @@ public class AMQProtocolEngine implement
             return _broker.getEventLogger();
         }
     }
+
+    @Override
+    public ServerChannelMethodProcessor getChannelMethodProcessor(final int channelId)
+    {
+        ServerChannelMethodProcessor channelMethodProcessor = getChannel(channelId);
+        if(channelMethodProcessor == null)
+        {
+            channelMethodProcessor = (ServerChannelMethodProcessor) Proxy.newProxyInstance(ServerMethodDispatcher.class.getClassLoader(),
+                                                            new Class[] { ServerChannelMethodProcessor.class }, new InvocationHandler()
+                    {
+                        @Override
+                        public Object invoke(final Object proxy, final Method method, final Object[] args)
+                                throws Throwable
+                        {
+                            if(method.getName().startsWith("receive"))
+                            {
+                                closeConnection(AMQConstant.CHANNEL_ERROR,
+                                                "Unknown channel id: " + channelId,
+                                                channelId);
+                                return null;
+                            }
+                            else if(method.getName().equals("ignoreAllButCloseOk"))
+                            {
+                                return false;
+                            }
+                            return null;
+                        }
+                    });
+        }
+        return channelMethodProcessor;
+    }
+
+    @Override
+    public void receiveHeartbeat()
+    {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV Heartbeat");
+        }
+
+        // No op
+    }
+
+    @Override
+    public void receiveProtocolHeader(final ProtocolInitiation protocolInitiation)
+    {
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("RECV ProtocolHeader [" + protocolInitiation + " ]");
+        }
+
+        protocolInitiationReceived(protocolInitiation);
+    }
+
+    @Override
+    public void setCurrentMethod(final int classId, final int methodId)
+    {
+        _currentClassId = classId;
+        _currentMethodId = methodId;
+    }
+
+    @Override
+    public boolean ignoreAllButCloseOk()
+    {
+        return _closing.get();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Fri Oct 17 14:23:19 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -80,7 +79,7 @@ public abstract class ConsumerTarget_0_8
 
     public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
-                                                         FlowCreditManager creditManager) throws AMQException
+                                                         FlowCreditManager creditManager)
     {
         return new BrowserConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
@@ -90,7 +89,7 @@ public abstract class ConsumerTarget_0_8
                                                           final FieldTable filters,
                                                           final FlowCreditManager creditManager,
                                                           final ClientDeliveryMethod deliveryMethod,
-                                                          final RecordDeliveryMethod recordMethod) throws AMQException
+                                                          final RecordDeliveryMethod recordMethod)
     {
         return new GetNoAckConsumer(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
     }
@@ -107,7 +106,6 @@ public abstract class ConsumerTarget_0_8
                                FlowCreditManager creditManager,
                                ClientDeliveryMethod deliveryMethod,
                                RecordDeliveryMethod recordMethod)
-            throws AMQException
         {
             super(channel, consumerTag,
                   filters, creditManager, deliveryMethod, recordMethod);
@@ -148,7 +146,7 @@ public abstract class ConsumerTarget_0_8
 
     public static ConsumerTarget_0_8 createNoAckTarget(AMQChannel channel,
                                                            AMQShortString consumerTag, FieldTable filters,
-                                                           FlowCreditManager creditManager) throws AMQException
+                                                           FlowCreditManager creditManager)
     {
         return new NoAckConsumer(channel, consumerTag, filters, creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
@@ -171,7 +169,6 @@ public abstract class ConsumerTarget_0_8
                              FlowCreditManager creditManager,
                              ClientDeliveryMethod deliveryMethod,
                              RecordDeliveryMethod recordMethod)
-            throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
 
@@ -207,7 +204,7 @@ public abstract class ConsumerTarget_0_8
             long size;
             synchronized (getChannel())
             {
-                getChannel().getProtocolSession().setDeferFlush(batch);
+                getChannel().getConnection().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
                 size = sendToClient(consumer, message, props, deliveryTag);
@@ -249,7 +246,6 @@ public abstract class ConsumerTarget_0_8
                                 FlowCreditManager creditManager,
                                 ClientDeliveryMethod deliveryMethod,
                                 RecordDeliveryMethod recordMethod)
-            throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
         }
@@ -265,7 +261,6 @@ public abstract class ConsumerTarget_0_8
     public static ConsumerTarget_0_8 createAckTarget(AMQChannel channel,
                                                          AMQShortString consumerTag, FieldTable filters,
                                                          FlowCreditManager creditManager)
-            throws AMQException
     {
         return new AckConsumer(channel,consumerTag,filters,creditManager, channel.getClientDeliveryMethod(), channel.getRecordDeliveryMethod());
     }
@@ -276,7 +271,6 @@ public abstract class ConsumerTarget_0_8
                                                          FlowCreditManager creditManager,
                                                          ClientDeliveryMethod deliveryMethod,
                                                          RecordDeliveryMethod recordMethod)
-            throws AMQException
     {
         return new AckConsumer(channel,consumerTag,filters,creditManager, deliveryMethod, recordMethod);
     }
@@ -288,7 +282,6 @@ public abstract class ConsumerTarget_0_8
                            FlowCreditManager creditManager,
                            ClientDeliveryMethod deliveryMethod,
                            RecordDeliveryMethod recordMethod)
-            throws AMQException
         {
             super(channel, consumerTag, filters, creditManager, deliveryMethod, recordMethod);
         }
@@ -308,7 +301,7 @@ public abstract class ConsumerTarget_0_8
 
             synchronized (getChannel())
             {
-                getChannel().getProtocolSession().setDeferFlush(batch);
+                getChannel().getConnection().setDeferFlush(batch);
                 long deliveryTag = getChannel().getNextDeliveryTag();
 
                 addUnacknowledgedMessage(entry);
@@ -346,7 +339,6 @@ public abstract class ConsumerTarget_0_8
                               FlowCreditManager creditManager,
                               ClientDeliveryMethod deliveryMethod,
                               RecordDeliveryMethod recordMethod)
-            throws AMQException
     {
         super(State.ACTIVE);
 
@@ -474,9 +466,9 @@ public abstract class ConsumerTarget_0_8
         return _consumerTag;
     }
 
-    public AMQProtocolSession getProtocolSession()
+    public AMQProtocolEngine getProtocolSession()
     {
-        return _channel.getProtocolSession();
+        return _channel.getConnection();
     }
 
     public void restoreCredit(final ServerMessage message)
@@ -525,7 +517,7 @@ public abstract class ConsumerTarget_0_8
 
     public void confirmAutoClose()
     {
-        ProtocolOutputConverter converter = getChannel().getProtocolSession().getProtocolOutputConverter();
+        ProtocolOutputConverter converter = getChannel().getConnection().getProtocolOutputConverter();
         converter.confirmConsumerAutoClose(getChannel().getChannelId(), getConsumerTag());
     }
 
@@ -540,9 +532,9 @@ public abstract class ConsumerTarget_0_8
 
     public void flushBatched()
     {
-        _channel.getProtocolSession().setDeferFlush(false);
+        _channel.getConnection().setDeferFlush(false);
 
-        _channel.getProtocolSession().flushBatched();
+        _channel.getConnection().flushBatched();
     }
 
     protected void addUnacknowledgedMessage(MessageInstance entry)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Oct 17 14:23:19 2014
@@ -20,14 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.util.Map;
+
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.MessageInstance;
 
-import java.util.Map;
-
 public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor
 {
     private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
@@ -45,7 +44,7 @@ public class ExtractResendAndRequeue imp
         _msgToResend = msgToResend;
     }
 
-    public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
+    public boolean callback(final long deliveryTag, MessageInstance message)
     {
 
         message.setRedelivered();

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Fri Oct 17 14:23:19 2014
@@ -20,16 +20,15 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.MessageDestination;
 
-import java.util.ArrayList;
-import java.util.List;
-
 public class IncomingMessage
 {
 
@@ -58,7 +57,7 @@ public class IncomingMessage
         return _messagePublishInfo;
     }
 
-    public void addContentBodyFrame(final ContentBody contentChunk) throws AMQException
+    public void addContentBodyFrame(final ContentBody contentChunk)
     {
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
@@ -94,7 +93,7 @@ public class IncomingMessage
         _messageDestination = e;
     }
 
-    public int getBodyCount() throws AMQException
+    public int getBodyCount()
     {
         return _contentChunks.size();
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java Fri Oct 17 14:23:19 2014
@@ -33,7 +33,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.internal.InternalMessage;
 import org.apache.qpid.server.plugin.MessageConverter;
 import org.apache.qpid.server.plugin.PluggableService;
@@ -135,38 +135,10 @@ public class MessageConverter_Internal_t
     private MessageMetaData convertMetaData(final InternalMessage serverMsg, final String bodyMimeType, final int size)
     {
 
-        MessagePublishInfo publishInfo = new MessagePublishInfo()
-                                            {
-                                                @Override
-                                                public AMQShortString getExchange()
-                                                {
-                                                    return AMQShortString.EMPTY_STRING;
-                                                }
-
-                                                @Override
-                                                public void setExchange(final AMQShortString amqShortString)
-                                                {
-                                                    throw new UnsupportedOperationException();
-                                                }
-
-                                                @Override
-                                                public boolean isImmediate()
-                                                {
-                                                    return false;
-                                                }
-
-                                                @Override
-                                                public boolean isMandatory()
-                                                {
-                                                    return false;
-                                                }
-
-                                                @Override
-                                                public AMQShortString getRoutingKey()
-                                                {
-                                                    return AMQShortString.valueOf(serverMsg.getInitialRoutingAddress());
-                                                }
-                                            };
+        MessagePublishInfo publishInfo = new MessagePublishInfo(AMQShortString.EMPTY_STRING,
+                                                                false,
+                                                                false,
+                                                                AMQShortString.valueOf(serverMsg.getInitialRoutingAddress()));
 
 
         final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
@@ -191,7 +163,7 @@ public class MessageConverter_Internal_t
 
         props.setHeaders(FieldTable.convertToFieldTable(headerProps));
 
-        final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+        final ContentHeaderBody chb = new ContentHeaderBody(props);
         chb.setBodySize(size);
         return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Oct 17 14:23:19 2014
@@ -20,8 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
-import org.apache.qpid.AMQException;
+import java.util.Set;
+
 import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.framing.AMQProtocolVersionException;
 import org.apache.qpid.framing.AMQShortString;
@@ -29,7 +34,7 @@ import org.apache.qpid.framing.BasicCont
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.EncodingUtils;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.plugin.MessageMetaDataType;
 import org.apache.qpid.server.store.StorableMessageMetaData;
@@ -37,12 +42,6 @@ import org.apache.qpid.server.util.ByteB
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.util.ByteBufferInputStream;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
 /**
  * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
  * single unit.
@@ -170,33 +169,11 @@ public class MessageMetaData implements 
                 long arrivalTime = EncodingUtils.readLong(dais);
 
                 MessagePublishInfo publishBody =
-                        new MessagePublishInfo()
-                        {
+                        new MessagePublishInfo(exchange,
+                                               (flags & IMMEDIATE_FLAG) != 0,
+                                               (flags & MANDATORY_FLAG) != 0,
+                                               routingKey);
 
-                            public AMQShortString getExchange()
-                            {
-                                return exchange;
-                            }
-
-                            public void setExchange(AMQShortString exchange)
-                            {
-                            }
-
-                            public boolean isImmediate()
-                            {
-                                return (flags & IMMEDIATE_FLAG) != 0;
-                            }
-
-                            public boolean isMandatory()
-                            {
-                                return (flags & MANDATORY_FLAG) != 0;
-                            }
-
-                            public AMQShortString getRoutingKey()
-                            {
-                                return routingKey;
-                            }
-                        };
                 return new MessageMetaData(publishBody, chb, arrivalTime);
             }
             catch (IOException e)

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Fri Oct 17 14:23:19 2014
@@ -20,13 +20,12 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.queue.QueueEntry;
-
 import java.util.Collection;
 import java.util.Set;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
+
 
 public interface UnacknowledgedMessageMap
 {
@@ -37,12 +36,12 @@ public interface UnacknowledgedMessageMa
          *@param message the message being iterated over @return true to stop iteration, false to continue
          * @throws AMQException
          */
-        boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
+        boolean callback(final long deliveryTag, MessageInstance message);
 
         void visitComplete();
     }
 
-    void visit(Visitor visitor) throws AMQException;
+    void visit(Visitor visitor);
 
     void add(long deliveryTag, MessageInstance message);
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Fri Oct 17 14:23:19 2014
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.qpid.AMQException;
 import org.apache.qpid.server.message.MessageInstance;
 
 public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
@@ -82,7 +81,7 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public void visit(Visitor visitor) throws AMQException
+    public void visit(Visitor visitor)
     {
         synchronized (_lock)
         {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java Fri Oct 17 14:23:19 2014
@@ -29,7 +29,7 @@ import java.util.Map;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.exchange.ExchangeImpl;
 import org.apache.qpid.server.message.MessageContentSource;
@@ -41,7 +41,7 @@ import org.apache.qpid.test.utils.QpidTe
 public class AMQChannelTest extends QpidTestCase
 {
     private VirtualHostImpl _virtualHost;
-    private AMQProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolSession;
     private Map<Integer,String> _replies;
     private Broker _broker;
 
@@ -98,18 +98,17 @@ public class AMQChannelTest extends Qpid
         AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
         channel.setLocalTransactional();
 
-        MessagePublishInfo info = mock(MessagePublishInfo.class);
+        MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
         ExchangeImpl e = mock(ExchangeImpl.class);
         ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
         BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
 
         when(contentHeaderBody.getProperties()).thenReturn(properties);
-        when(info.getExchange()).thenReturn(new AMQShortString("test"));
         when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName() + "_incorrect"));
 
         channel.setPublishFrame(info, e);
         channel.publishContentHeader(contentHeaderBody);
-        channel.commit();
+        channel.commit(null, false);
 
         assertEquals("Unexpected number of replies", 1, _replies.size());
         assertEquals("Message authorization passed", "Access Refused", _replies.get(403));
@@ -121,18 +120,17 @@ public class AMQChannelTest extends Qpid
         AMQChannel channel = new AMQChannel(_protocolSession, 1, _virtualHost.getMessageStore());
         channel.setLocalTransactional();
 
-        MessagePublishInfo info = mock(MessagePublishInfo.class);
+        MessagePublishInfo info = new MessagePublishInfo(new AMQShortString("test"), false, false, null);
         ExchangeImpl e = mock(ExchangeImpl.class);
         ContentHeaderBody contentHeaderBody= mock(ContentHeaderBody.class);
         BasicContentHeaderProperties properties = mock(BasicContentHeaderProperties.class);
 
         when(contentHeaderBody.getProperties()).thenReturn(properties);
-        when(info.getExchange()).thenReturn(new AMQShortString("test"));
         when(properties.getUserId()).thenReturn(new AMQShortString(_protocolSession.getAuthorizedPrincipal().getName()));
 
         channel.setPublishFrame(info, e);
         channel.publishContentHeader(contentHeaderBody);
-        channel.commit();
+        channel.commit(null, false);
 
         assertEquals("Unexpected number of replies", 0, _replies.size());
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java Fri Oct 17 14:23:19 2014
@@ -29,7 +29,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.flow.LimitlessCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -51,7 +51,7 @@ public class AckTest extends QpidTestCas
     private ConsumerTarget_0_8 _subscriptionTarget;
     private ConsumerImpl _consumer;
 
-    private AMQProtocolSession _protocolSession;
+    private AMQProtocolEngine _protocolEngine;
 
     private TestMemoryMessageStore _messageStore;
 
@@ -68,8 +68,8 @@ public class AckTest extends QpidTestCas
         super.setUp();
         BrokerTestHelper.setUp();
         _channel = BrokerTestHelper_0_8.createChannel(5);
-        _protocolSession = _channel.getProtocolSession();
-        _virtualHost = _protocolSession.getVirtualHost();
+        _protocolEngine = _channel.getConnection();
+        _virtualHost = _protocolEngine.getVirtualHost();
         _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost);
         _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore();
     }
@@ -90,37 +90,10 @@ public class AckTest extends QpidTestCas
     {
         for (int i = 1; i <= count; i++)
         {
-            MessagePublishInfo publishBody = new MessagePublishInfo()
-            {
-
-                public AMQShortString getExchange()
-                {
-                    return new AMQShortString("someExchange");
-                }
-
-                public void setExchange(AMQShortString exchange)
-                {
-                    //To change body of implemented methods use File | Settings | File Templates.
-                }
-
-                public boolean isImmediate()
-                {
-                    return false;
-                }
-
-                public boolean isMandatory()
-                {
-                    return false;
-                }
-
-                public AMQShortString getRoutingKey()
-                {
-                    return new AMQShortString("rk");
-                }
-            };
+            MessagePublishInfo publishBody = new MessagePublishInfo(new AMQShortString("someExchange"), false, false,
+                                                                    new AMQShortString("rk"));
             BasicContentHeaderProperties b = new BasicContentHeaderProperties();
-            ContentHeaderBody cb = new ContentHeaderBody();
-            cb.setProperties(b);
+            ContentHeaderBody cb = new ContentHeaderBody(b);
 
             if (persistent)
             {

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java Fri Oct 17 14:23:19 2014
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.MessageCounter;
 import org.apache.qpid.server.store.MessageStore;
@@ -75,7 +76,7 @@ public class AcknowledgeTest extends Qpi
 
     private InternalTestProtocolSession getSession()
     {
-        return (InternalTestProtocolSession)_channel.getProtocolSession();
+        return (InternalTestProtocolSession)_channel.getConnection();
     }
 
     private AMQQueue getQueue()
@@ -129,7 +130,7 @@ public class AcknowledgeTest extends Qpi
 
         if (getChannel().isTransactional())
         {
-            getChannel().commit();
+            getChannel().commit(null, false);
         }
 
         //Ensure they are stored
@@ -140,7 +141,7 @@ public class AcknowledgeTest extends Qpi
 
         //Subscribe to the queue
         AMQShortString subscriber = _channel.consumeFromSource(null,
-                                                               Collections.singleton(_queue),
+                                                               Collections.<MessageSource>singleton(_queue),
                                                                true, null, true, false);
 
         getQueue().deliverAsync();
@@ -164,7 +165,7 @@ public class AcknowledgeTest extends Qpi
 
         if (getChannel().isTransactional())
         {
-            getChannel().commit();
+            getChannel().commit(null, false);
         }
 
         // Check Remaining Acknowledgements

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java Fri Oct 17 14:23:19 2014
@@ -24,18 +24,15 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class BrokerTestHelper_0_8 extends BrokerTestHelper
 {
 
-    public static AMQChannel createChannel(int channelId, AMQProtocolSession session) throws AMQException
+    public static AMQChannel createChannel(int channelId, AMQProtocolEngine session) throws AMQException
     {
         AMQChannel channel = new AMQChannel(session, channelId, session.getVirtualHost().getMessageStore());
         session.addChannel(channel);
@@ -69,9 +66,7 @@ public class BrokerTestHelper_0_8 extend
     {
         AMQShortString routingKey = new AMQShortString(queueName);
         AMQShortString exchangeNameAsShortString = new AMQShortString(exchangeName);
-        MessagePublishInfo info = mock(MessagePublishInfo.class);
-        when(info.getExchange()).thenReturn(exchangeNameAsShortString);
-        when(info.getRoutingKey()).thenReturn(routingKey);
+        MessagePublishInfo info = new MessagePublishInfo(exchangeNameAsShortString, false, false, routingKey);
 
         MessageDestination destination;
         if(exchangeName == null || "".equals(exchangeName))
@@ -86,22 +81,20 @@ public class BrokerTestHelper_0_8 extend
         {
             channel.setPublishFrame(info, destination);
 
-            // Set the body size
-            ContentHeaderBody _headerBody = new ContentHeaderBody();
-            _headerBody.setBodySize(0);
 
             // Set Minimum properties
             BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
 
+
             properties.setExpiration(0L);
             properties.setTimestamp(System.currentTimeMillis());
 
             // Make Message Persistent
             properties.setDeliveryMode((byte) 2);
 
-            _headerBody.setProperties(properties);
+            ContentHeaderBody headerBody = new ContentHeaderBody(properties, 0);
 
-            channel.publishContentHeader(_headerBody);
+            channel.publishContentHeader(headerBody);
         }
         channel.sync();
     }

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java Fri Oct 17 14:23:19 2014
@@ -39,14 +39,13 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -231,7 +230,7 @@ public class InternalTestProtocolSession
         //Simulate the Client responding with a CloseOK
         // should really update the StateManger but we don't have access here
         // changeState(AMQState.CONNECTION_CLOSED);
-        ((AMQChannel)session).getProtocolSession().closeSession();
+        ((AMQChannel)session).getConnection().closeSession();
 
     }
 

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java Fri Oct 17 14:23:19 2014
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -46,23 +44,16 @@ public class MaxChannelsTest extends Qpi
 
         long maxChannels = 10L;
         _session.setMaximumNumberOfChannels(maxChannels);
-        assertEquals("Number of channels not correctly set.", new Long(maxChannels), _session.getMaximumNumberOfChannels());
+        assertEquals("Number of channels not correctly set.", maxChannels, _session.getMaximumNumberOfChannels());
 
-        for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+        for (long currentChannel = 1L; currentChannel <= maxChannels; currentChannel++)
         {
-            _session.addChannel(new AMQChannel(_session, (int) currentChannel, null));
+            _session.receiveChannelOpen( (int) currentChannel);
         }
-
-        try
-        {
-            _session.addChannel(new AMQChannel(_session, (int) maxChannels, null));
-            fail("Cannot create more channels then maximum");
-        }
-        catch (AMQException e)
-        {
-            assertEquals("Wrong exception received.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
-        }
-        assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_session.getChannels().size()));
+        assertFalse("Connection should not be closed after opening " + maxChannels + " channels",_session.isClosed());
+        assertEquals("Maximum number of channels not set.", maxChannels, _session.getChannels().size());
+        _session.receiveChannelOpen((int) maxChannels+1);
+        assertTrue("Connection should be closed after opening " + (maxChannels + 1) + " channels",_session.isClosed());
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org