You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/15 12:10:20 UTC

svn commit: r1739270 [2/8] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/pro...

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Apr 15 10:10:16 2016
@@ -25,11 +25,18 @@ import java.nio.ByteBuffer;
 import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -39,20 +46,21 @@ import javax.security.sasl.SaslServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
-import org.apache.qpid.amqp_1_0.framing.TransportFrame;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.configuration.CommonProperties;
@@ -64,6 +72,27 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -74,191 +103,1048 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
 
 public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
-        implements FrameOutputHandler
+        implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
+                   ValueWriter.Registry.Source,
+                   ErrorHandler,
+                   SASLEndpoint,
+                   ConnectionHandler
 {
 
-    public static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
+    private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
+    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
+    private static final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
+
+
+
+    private static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+
+    private final AtomicBoolean _stateChanged = new AtomicBoolean();
+    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+
+
+    private static final byte[] SASL_HEADER = new byte[]
+            {
+                    (byte) 'A',
+                    (byte) 'M',
+                    (byte) 'Q',
+                    (byte) 'P',
+                    (byte) 3,
+                    (byte) 1,
+                    (byte) 0,
+                    (byte) 0
+            };
+
+    private static final byte[] AMQP_HEADER = new byte[]
+            {
+                    (byte) 'A',
+                    (byte) 'M',
+                    (byte) 'Q',
+                    (byte) 'P',
+                    (byte) 0,
+                    (byte) 1,
+                    (byte) 0,
+                    (byte) 0
+            };
+
+    private FrameWriter _frameWriter;
+    private ProtocolHandler _frameHandler;
+    private volatile boolean _transportBlockedForWriting;
+
+    private enum FrameReceivingState
+    {
+        AMQP_OR_SASL_HEADER,
+        SASL_INIT_ONLY,
+        SASL_RESPONSE_ONLY,
+        AMQP_HEADER,
+        OPEN_ONLY,
+        ANY_FRAME,
+        CLOSED
+    }
+
+    private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER;
+
+    private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
+    private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+
+    private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
+    private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
+
+    private AmqpPort<?> _port;
+    private SubjectCreator _subjectCreator;
+    private Transport _transport;
+    private long _connectionId;
+
+    private Container _container;
+    private Principal _user;
+
+
+    private int _channelMax = DEFAULT_CHANNEL_MAX;
+    private int _maxFrameSize = 4096;
+    private String _remoteContainerId;
+
+    private SocketAddress _remoteAddress;
+
+    // positioned by the *outgoing* channel
+    private SessionEndpoint[] _sendingSessions;
+
+    // positioned by the *incoming* channel
+    private SessionEndpoint[] _receivingSessions;
+    private boolean _closedForInput;
+    private boolean _closedForOutput;
+
+    private long _idleTimeout;
+
+    private ConnectionState _connectionState = ConnectionState.UNOPENED;
+
+    private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
+            .registerTransportLayer()
+            .registerMessagingLayer()
+            .registerTransactionLayer()
+            .registerSecurityLayer();
+
+
+    private Map _properties;
+    private SaslServerProvider _saslServerProvider;
+    private boolean _saslComplete;
+
+    private SaslServer _saslServer;
+    private String _localHostname;
+    private long _desiredIdleTimeout;
+    private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+    private Error _remoteError;
+
+    private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
+
+    private Map _remoteProperties;
+
+    private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
+
+    private final Collection<Session_1_0>
+            _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+
+    private final Object _reference = new Object();
+
+    private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
+            new ConcurrentLinkedQueue<>();
+
+    private boolean _closedOnOpen;
+
+
+
+    AMQPConnection_1_0(final Broker<?> broker,
+                       final ServerNetworkConnection network,
+                       AmqpPort<?> port, Transport transport, long id,
+                       final AggregateTicker aggregateTicker,
+                       final boolean useSASL)
+    {
+        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
+        _container = new Container(broker.getId().toString());
+
+        _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+
+        _saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null;
+        _port = port;
+        _transport = transport;
+        _connectionId = id;
+
+        Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
+        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
+
+        setProperties(serverProperties);
+
+        setRemoteAddress(network.getRemoteAddress());
+
+        setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
+
+        _frameWriter =  new FrameWriter(getDescribedTypeRegistry());
+
+
+    }
+
+
+    private void setUserPrincipal(final Principal user)
+    {
+        setSubject(_subjectCreator.createSubjectWithGroups(user));
+    }
+
+    private long getDesiredIdleTimeout()
+    {
+        return _desiredIdleTimeout;
+    }
+
+    public void receiveAttach(final short channel, final Attach attach)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endPoint = getSession(channel);
+        if (endPoint != null)
+        {
+            endPoint.receiveAttach(attach);
+        }
+    }
+
+    public void receive(final short channel, final Object frame)
+    {
+        List<Runnable> postLockActions;
+        FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
+        if (frame instanceof FrameBody)
+        {
+            ((FrameBody) frame).invoke(channel, this);
+        }
+        else if (frame instanceof SaslFrameBody)
+        {
+            ((SaslFrameBody) frame).invoke(channel, this);
+        }
+    }
+
+    private void closeSaslWithFailure()
+    {
+        _saslComplete = true;
+        _frameReceivingState = FrameReceivingState.CLOSED;
+        setClosedForInput(true);
+        close();
+    }
+
+    public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+    {
+        // TODO - log unexpected frame
+        closeSaslWithFailure();
+    }
+
+    public void receiveClose(final short channel, final Close close)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        _frameReceivingState = FrameReceivingState.CLOSED;
+        setClosedForInput(true);
+        closeReceived();
+        switch (_connectionState)
+        {
+            case UNOPENED:
+            case AWAITING_OPEN:
+                Error error = new Error();
+                error.setCondition(ConnectionError.CONNECTION_FORCED);
+                error.setDescription("Connection close sent before connection was opened");
+                closeConnection(error);
+                break;
+            case OPEN:
+                _connectionState = ConnectionState.CLOSE_RECEIVED;
+                // TODO - we should log the error we received from the client if present
+                sendClose(new Close());
+                _connectionState = ConnectionState.CLOSED;
+                _orderlyClose.set(true);
+                break;
+            case CLOSE_SENT:
+                _connectionState = ConnectionState.CLOSED;
+                _orderlyClose.set(true);
+
+            default:
+        }
+        _remoteError = close.getError();
+
+    }
+
+    private void closeReceived()
+    {
+        Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
+
+        for(Session_1_0 session : sessions)
+        {
+            session.remoteEnd(new End());
+        }
+    }
+
+    private void setClosedForInput(final boolean closed)
+    {
+        _closedForInput = closed;
+    }
+
+    public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+    {
+        // TODO - log unexpected frame
+        closeSaslWithFailure();
+    }
+
+    public void receiveSaslResponse(final SaslResponse saslResponse)
+    {
+        final Binary responseBinary = saslResponse.getResponse();
+        byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
+
+        assertState(FrameReceivingState.SASL_RESPONSE_ONLY);
+
+        try
+        {
+
+            // Process response from the client
+            byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
+
+            if (_saslServer.isComplete())
+            {
+                SaslOutcome outcome = new SaslOutcome();
+
+                outcome.setCode(SaslCode.OK);
+                send(new SASLFrame(outcome), null);
+                _saslComplete = true;
+                _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
+                _frameReceivingState = FrameReceivingState.AMQP_HEADER;
+            }
+            else
+            {
+                SaslChallenge challengeBody = new SaslChallenge();
+                challengeBody.setChallenge(new Binary(challenge));
+                send(new SASLFrame(challengeBody), null);
+
+            }
+        }
+        catch (SaslException e)
+        {
+            SaslOutcome outcome = new SaslOutcome();
+
+            outcome.setCode(SaslCode.AUTH);
+            send(new SASLFrame(outcome), null);
+            _saslComplete = true;
+            closeSaslWithFailure();
+
+        }
+    }
+
+    public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
+    {
+        return _describedTypeRegistry;
+    }
+
+    private void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
+    {
+        addAsyncTask(new Action<ConnectionHandler>()
+        {
+            @Override
+            public void performAction(final ConnectionHandler object)
+            {
+                session.close(cause, message);
+            }
+        });
+    }
+
+
+    private boolean closedForOutput()
+    {
+        return _closedForOutput;
+    }
+
+    public boolean isClosed()
+    {
+        return _connectionState == ConnectionState.CLOSED
+               || _connectionState == ConnectionState.CLOSE_RECEIVED;
+    }
+
+    public boolean closedForInput()
+    {
+        return _closedForInput;
+    }
+
+    void sessionEnded(final Session_1_0 session)
+    {
+        if(!_closedOnOpen)
+        {
+            _sessions.remove(session);
+            sessionRemoved(session);
+        }
+    }
+
+    public int send(final short channel, final FrameBody body, final QpidByteBuffer payload)
+    {
+        return sendFrame(channel, body, payload);
+    }
+
+    private void inputClosed()
+    {
+        List<Runnable> postLockActions;
+
+        if (!_closedForInput)
+        {
+            _closedForInput = true;
+            FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
+            switch (_connectionState)
+            {
+                case UNOPENED:
+                case AWAITING_OPEN:
+                case CLOSE_SENT:
+                    _connectionState = ConnectionState.CLOSED;
+                    closeSender();
+                    break;
+                case OPEN:
+                    _connectionState = ConnectionState.CLOSE_RECEIVED;
+                case CLOSED:
+                    // already sent our close - too late to do anything more
+                    break;
+                default:
+            }
+            closeReceived();
+        }
+
+
+    }
+
+    private void closeSender()
+    {
+        setClosedForOutput(true);
+        close();
+    }
+
+    String getRemoteContainerId()
+    {
+        return _remoteContainerId;
+    }
 
-    public static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
-    private final Broker<?> _broker;
+    private void setDesiredIdleTimeout(final long desiredIdleTimeout)
+    {
+        _desiredIdleTimeout = desiredIdleTimeout;
+    }
+
+    public boolean isOpen()
+    {
+        return _connectionState == ConnectionState.OPEN;
+    }
+
+    void sendEnd(final short channel, final End end, final boolean remove)
+    {
+        sendFrame(channel, end);
+        if (remove)
+        {
+            _sendingSessions[channel] = null;
+        }
+
+    }
+
+    public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+    {
+        // TODO - log unexpected frame
+        closeSaslWithFailure();
+    }
+
+    public void receiveEnd(final short channel, final End end)
+    {
+
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endpoint = _receivingSessions[channel];
+        if (endpoint != null)
+        {
+            _receivingSessions[channel] = null;
+
+            endpoint.receiveEnd(end);
+        }
+        else
+        {
+            // TODO error
+        }
+    }
+
+    public void receiveDisposition(final short channel,
+                                   final Disposition disposition)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endPoint = getSession(channel);
+        if (endPoint != null)
+        {
+            endPoint.receiveDisposition(disposition);
+        }
+
+    }
+
+    public void receiveBegin(final short channel, final Begin begin)
+    {
+
+        assertState(FrameReceivingState.ANY_FRAME);
+        short myChannelId;
+        if (begin.getRemoteChannel() != null)
+        {
+            myChannelId = begin.getRemoteChannel().shortValue();
+            SessionEndpoint sessionEndpoint;
+            try
+            {
+                sessionEndpoint = _sendingSessions[myChannelId];
+            }
+            catch (IndexOutOfBoundsException e)
+            {
+                final Error error = new Error();
+                error.setCondition(ConnectionError.FRAMING_ERROR);
+                error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+                                     + begin.getRemoteChannel() + " which is outside the valid range of 0 to "
+                                     + _channelMax + ".");
+                closeConnection(error);
+                return;
+            }
+            if (sessionEndpoint != null)
+            {
+                if (_receivingSessions[channel] == null)
+                {
+                    _receivingSessions[channel] = sessionEndpoint;
+                    sessionEndpoint.setReceivingChannel(channel);
+                    sessionEndpoint.setNextIncomingId(begin.getNextOutgoingId());
+                    sessionEndpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
+
+                    if (sessionEndpoint.getState() == SessionState.END_SENT)
+                    {
+                        _sendingSessions[myChannelId] = null;
+                    }
+                }
+                else
+                {
+                    final Error error = new Error();
+                    error.setCondition(ConnectionError.FRAMING_ERROR);
+                    error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
+                    closeConnection(error);
+                }
+            }
+            else
+            {
+                final Error error = new Error();
+                error.setCondition(ConnectionError.FRAMING_ERROR);
+                error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+                                     + begin.getRemoteChannel() + " which is not known as a begun session.");
+                closeConnection(error);
+            }
+
+
+        }
+        else // Peer requesting session creation
+        {
+
+            myChannelId = getFirstFreeChannel();
+            if (myChannelId == -1)
+            {
+                // close any half open channel
+                myChannelId = getFirstFreeChannel();
+
+            }
+
+            if (_receivingSessions[channel] == null)
+            {
+                SessionEndpoint sessionEndpoint = new SessionEndpoint(this, begin);
+
+                _receivingSessions[channel] = sessionEndpoint;
+                _sendingSessions[myChannelId] = sessionEndpoint;
+
+                Begin beginToSend = new Begin();
+
+                sessionEndpoint.setReceivingChannel(channel);
+                sessionEndpoint.setSendingChannel(myChannelId);
+                beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
+                beginToSend.setNextOutgoingId(sessionEndpoint.getNextOutgoingId());
+                beginToSend.setOutgoingWindow(sessionEndpoint.getOutgoingWindowSize());
+                beginToSend.setIncomingWindow(sessionEndpoint.getIncomingWindowSize());
+                sendFrame(myChannelId, beginToSend);
+
+                remoteSessionCreation(sessionEndpoint);
+            }
+            else
+            {
+                final Error error = new Error();
+                error.setCondition(ConnectionError.FRAMING_ERROR);
+                error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
+                closeConnection(error);
+            }
+
+        }
+
+    }
+
+    private void remoteSessionCreation(final SessionEndpoint sessionEndpoint)
+    {
+        if(!_closedOnOpen)
+        {
+            final Session_1_0 session = new Session_1_0(this, sessionEndpoint);
+            _sessions.add(session);
+            sessionAdded(session);
+            sessionEndpoint.setSessionEventListener(new SessionEventListener()
+            {
+                @Override
+                public void remoteLinkCreation(final LinkEndpoint endpoint11)
+                {
+                    AccessController.doPrivileged(new PrivilegedAction<Object>()
+                    {
+                        @Override
+                        public Object run()
+                        {
+                            session.remoteLinkCreation(endpoint11);
+                            return null;
+                        }
+                    }, session.getAccessControllerContext());
+                }
+
+                @Override
+                public void remoteEnd(final End end)
+                {
+                    AccessController.doPrivileged(new PrivilegedAction<Object>()
+                    {
+                        @Override
+                        public Object run()
+                        {
+                            session.remoteEnd(end);
+                            return null;
+                        }
+                    }, session.getAccessControllerContext());
+                }
+            });
+        }
+    }
+
+    private short getFirstFreeChannel()
+    {
+        for (int i = 0; i <= _channelMax; i++)
+        {
+            if (_sendingSessions[i] == null)
+            {
+                return (short) i;
+            }
+        }
+        return -1;
+    }
+
+    public void handleError(final Error error)
+    {
+        if (!closedForOutput())
+        {
+            Close close = new Close();
+            close.setError(error);
+            sendFrame((short) 0, close);
+
+            setClosedForOutput(true);
+        }
+
+    }
+
+    public void receiveTransfer(final short channel, final Transfer transfer)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endPoint = getSession(channel);
+        if (endPoint != null)
+        {
+            endPoint.receiveTransfer(transfer);
+        }
+
+    }
+
+    public SessionEndpoint createSession(final String name)
+    {
+        // todo assert connection state
+        short channel = getFirstFreeChannel();
+        if (channel != -1)
+        {
+            SessionEndpoint endpoint = new SessionEndpoint(this);
+            _sendingSessions[channel] = endpoint;
+            endpoint.setSendingChannel(channel);
+            Begin begin = new Begin();
+            begin.setNextOutgoingId(endpoint.getNextOutgoingId());
+            begin.setOutgoingWindow(endpoint.getOutgoingWindowSize());
+            begin.setIncomingWindow(endpoint.getIncomingWindowSize());
+
+            begin.setHandleMax(_handleMax);
+            sendFrame(channel, begin);
+            return endpoint;
+
+        }
+        else
+        {
+            // TODO - report error
+            return null;
+        }
+
+    }
+
+    public void receiveFlow(final short channel, final Flow flow)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endPoint = getSession(channel);
+        if (endPoint != null)
+        {
+            endPoint.receiveFlow(flow);
+        }
+
+    }
+
+    public void receiveOpen(final short channel, final Open open)
+    {
+        assertState(FrameReceivingState.OPEN_ONLY);
+        _frameReceivingState = FrameReceivingState.ANY_FRAME;
+        _channelMax = open.getChannelMax() == null ? _channelMax
+                : open.getChannelMax().intValue() < _channelMax
+                        ? open.getChannelMax().intValue()
+                        : _channelMax;
+        if (_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[_channelMax + 1];
+            _sendingSessions = new SessionEndpoint[_channelMax + 1];
+        }
+        _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
+        _remoteContainerId = open.getContainerId();
+        _localHostname = open.getHostname();
+        if (open.getIdleTimeOut() != null)
+        {
+            _idleTimeout = open.getIdleTimeOut().longValue();
+        }
+        _remoteProperties = open.getProperties();
+        if (_remoteProperties != null)
+        {
+            if (_remoteProperties.containsKey(Symbol.valueOf("product")))
+            {
+                setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString());
+            }
+            if (_remoteProperties.containsKey(Symbol.valueOf("version")))
+            {
+                setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString());
+            }
+            setClientId(_remoteContainerId);
+        }
+        if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
+        {
+            closeConnection(new Error(ConnectionError.CONNECTION_FORCED,
+                                      "Requested idle timeout of "
+                                      + _idleTimeout
+                                      + " is too low. The minimum supported timeout is"
+                                      + MINIMUM_SUPPORTED_IDLE_TIMEOUT));
+            close();
+            _closedOnOpen = true;
+        }
+        else
+        {
+            long desiredIdleTimeout = getDesiredIdleTimeout();
+            initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
+            final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(_localHostname);
+            if (vhost == null)
+            {
+                closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
+            }
+            else
+            {
+                if (vhost.getState() != org.apache.qpid.server.model.State.ACTIVE)
+                {
+                    final Error err = new Error();
+                    err.setCondition(AmqpError.NOT_FOUND);
+                    closeConnection(err);
+
+                    _closedOnOpen = true;
+
+                    populateConnectionRedirect(vhost, err);
+
+                    closeConnection(err);
+
+                    close();
+
+                    _closedOnOpen = true;
+
+                }
+                else
+                {
+                    final Principal user = _user;
+                    if (user != null)
+                    {
+                        setUserPrincipal(user);
+                    }
+                    if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
+                    {
+                        closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
+                    }
+                    else
+                    {
+                        try
+                        {
+                            setVirtualHost(vhost);
+                        }
+                        catch (VirtualHostUnavailableException e)
+                        {
+                            closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+                        }
+                    }
+                }
+            }
+        }
+        switch (_connectionState)
+        {
+            case UNOPENED:
+                sendOpen(_channelMax, _maxFrameSize);
+            case AWAITING_OPEN:
+                _connectionState = ConnectionState.OPEN;
+            default:
+                // TODO bad stuff (connection already open)
+
+        }
+
+    }
+
+    private void populateConnectionRedirect(final VirtualHost vhost, final Error err)
+    {
+        final String redirectHost = vhost.getRedirectHost(((AmqpPort) _port));
+
+        if(redirectHost == null)
+        {
+            err.setDescription("Virtual host '" + _localHostname + "' is not active");
+        }
+        else
+        {
+            String networkHost;
+            int port;
+            if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
+            {
+                // IPv6 case
+                networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
+                if(redirectHost.contains("]:"))
+                {
+                    port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
+                }
+                else
+                {
+                    port = -1;
+                }
+            }
+            else
+            {
+                if(redirectHost.contains(":"))
+                {
+                    networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
+                    try
+                    {
+                        String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
+                        port = Integer.parseInt(portString);
+                    }
+                    catch (NumberFormatException e)
+                    {
+                        port = -1;
+                    }
+                }
+                else
+                {
+                    networkHost = redirectHost;
+                    port = -1;
+                }
+            }
+            final Map<Symbol, Object> infoMap = new HashMap<>();
+            infoMap.put(Symbol.valueOf("network-host"), networkHost);
+            if(port > 0)
+            {
+                infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
+            }
+            err.setInfo(infoMap);
+        }
+    }
 
-    private ConnectionEndpoint _endpoint;
-    private final AtomicBoolean _stateChanged = new AtomicBoolean();
-    private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+    public void receiveDetach(final short channel, final Detach detach)
+    {
+        assertState(FrameReceivingState.ANY_FRAME);
+        SessionEndpoint endPoint = getSession(channel);
+        if (endPoint != null)
+        {
+            endPoint.receiveDetach(detach);
+        }
+    }
 
+    private void transportStateChanged()
+    {
+        for (Session_1_0 session : _sessions)
+        {
+            session.transportStateChanged();
+        }
+    }
 
-    private static final ByteBuffer SASL_LAYER_HEADER =
-           ByteBuffer.wrap(new byte[]
-                   {
-                       (byte)'A',
-                       (byte)'M',
-                       (byte)'Q',
-                       (byte)'P',
-                       (byte) 3,
-                       (byte) 1,
-                       (byte) 0,
-                       (byte) 0
-                   });
-
-    private static final ByteBuffer AMQP_LAYER_HEADER =
-        ByteBuffer.wrap(new byte[]
-                {
-                    (byte)'A',
-                    (byte)'M',
-                    (byte)'Q',
-                    (byte)'P',
-                    (byte) 0,
-                    (byte) 1,
-                    (byte) 0,
-                    (byte) 0
-                });
+    public void close(final Error error)
+    {
+        closeConnection(error);
+    }
 
+    private void setRemoteAddress(final SocketAddress remoteAddress)
+    {
+        _remoteAddress = remoteAddress;
+    }
 
-    private FrameWriter _frameWriter;
-    private ProtocolHandler _frameHandler;
-    private Object _sendLock = new Object();
-    private byte _major;
-    private byte _minor;
-    private byte _revision;
-    private final Connection_1_0 _connection;
-    private volatile boolean _transportBlockedForWriting;
+    public Principal getUser()
+    {
+        return _user;
+    }
 
+    public void setProperties(final Map<Symbol, Object> properties)
+    {
+        _properties = properties;
+    }
 
-    static enum State {
-           A,
-           M,
-           Q,
-           P,
-           PROTOCOL,
-           MAJOR,
-           MINOR,
-           REVISION,
-           FRAME
-       }
-
-    private State _state = State.A;
-
-    public AMQPConnection_1_0(final Broker<?> broker, final ServerNetworkConnection network,
-                              AmqpPort<?> port, Transport transport, long id,
-                              final AggregateTicker aggregateTicker,
-                              final boolean useSASL)
+    private void setClosedForOutput(final boolean closed)
     {
-        super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
-        _broker = broker;
-        _connection = createConnection(broker, network, port, transport, id, useSASL);
-        _endpoint = _connection.getConnectionEndpoint();
-        _endpoint.setConnectionEventListener(_connection);
-        _endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
-        _endpoint.setFrameOutputHandler(this);
-        final List<String> mechanisms = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms();
-        ByteBuffer headerResponse = useSASL ? initiateSasl() : initiateNonSasl(mechanisms);
+        _closedForOutput = closed;
+    }
 
-        _frameWriter =  new FrameWriter(_endpoint.getDescribedTypeRegistry());
+    public void receiveSaslInit(final SaslInit saslInit)
+    {
+        assertState(FrameReceivingState.SASL_INIT_ONLY);
+        String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
+        final Binary initialResponse = saslInit.getInitialResponse();
+        byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
 
-        getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate()));
-        getSender().flush();
 
-        if(useSASL)
+        try
         {
-            _endpoint.initiateSASL(mechanisms.toArray(new String[mechanisms.size()]));
-        }
+            _saslServer = _saslServerProvider.getSaslServer(mechanism, "localhost");
 
+            // Process response from the client
+            byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
 
-    }
+            if (_saslServer.isComplete())
+            {
+                SaslOutcome outcome = new SaslOutcome();
 
-    private Connection_1_0 createConnection(final Broker<?> broker,
-                                            final ServerNetworkConnection network,
-                                            final AmqpPort<?> port,
-                                            final Transport transport,
-                                            final long id,
-                                            final boolean useSASL)
-    {
-        Container container = new Container(broker.getId().toString());
+                outcome.setCode(SaslCode.OK);
+                send(new SASLFrame(outcome), null);
+                _saslComplete = true;
+                _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
 
-        SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
-        final ConnectionEndpoint endpoint =
-                new ConnectionEndpoint(container, useSASL ? asSaslServerProvider(subjectCreator, network) : null);
-        endpoint.setLogger(new ConnectionEndpoint.FrameReceiptLogger()
-        {
-            @Override
-            public boolean isEnabled()
-            {
-                return FRAME_LOGGER.isDebugEnabled();
-            }
+                _frameReceivingState = FrameReceivingState.AMQP_HEADER;
 
-            @Override
-            public void received(final SocketAddress remoteAddress, final short channel, final Object frame)
+            }
+            else
             {
-                FRAME_LOGGER.debug("RECV[" + remoteAddress + "|" + channel + "] : " + frame);
+                SaslChallenge challengeBody = new SaslChallenge();
+                challengeBody.setChallenge(new Binary(challenge));
+                send(new SASLFrame(challengeBody), null);
+
+                _frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY;
             }
-        });
-        Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
-        serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
+        }
+        catch (SaslException e)
+        {
+            SaslOutcome outcome = new SaslOutcome();
 
-        endpoint.setProperties(serverProperties);
+            outcome.setCode(SaslCode.AUTH);
+            send(new SASLFrame(outcome), null);
+            _saslComplete = true;
 
-        endpoint.setRemoteAddress(network.getRemoteAddress());
-        return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this);
+            closeSaslWithFailure();
+
+        }
     }
 
+    public int getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
 
-    public ByteBufferSender getSender()
+    Object getReference()
     {
-        return getNetwork().getSender();
+        return _reference;
     }
 
-    public ByteBuffer initiateNonSasl(final List<String> mechanisms)
+    private void endpointClosed()
     {
-        final ByteBuffer headerResponse;
-        if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME)
-           && getNetwork().getPeerPrincipal() != null)
+        try
         {
-            _connection.setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+            performDeleteTasks();
+            closeReceived();
         }
-        else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+        finally
         {
-            _connection.setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+            VirtualHost<?> virtualHost = getVirtualHost();
+            if (virtualHost != null)
+            {
+                virtualHost.deregisterConnection(this);
+            }
         }
-        else
+    }
+
+    private void closeConnection()
+    {
+        switch (_connectionState)
         {
-            getNetwork().close();
+            case AWAITING_OPEN:
+            case OPEN:
+                Close closeToSend = new Close();
+                sendClose(closeToSend);
+                _connectionState = ConnectionState.CLOSE_SENT;
+                break;
+            case CLOSE_SENT:
+            default:
         }
-
-        _frameHandler = new FrameHandler(_endpoint);
-        headerResponse = AMQP_LAYER_HEADER;
-        return headerResponse;
     }
 
-    public ByteBuffer initiateSasl()
+    private void closeConnection(final Error error)
     {
-        final ByteBuffer headerResponse;
-        _endpoint.setSaslFrameOutput(this);
+        Close close = new Close();
+        close.setError(error);
+        switch (_connectionState)
+        {
+            case UNOPENED:
+                sendOpen(0, 0);
+                sendClose(close);
+                _connectionState = ConnectionState.CLOSED;
+                break;
+            case AWAITING_OPEN:
+            case OPEN:
+                sendClose(close);
+                _connectionState = ConnectionState.CLOSE_SENT;
+            case CLOSE_SENT:
+            case CLOSED:
+                // already sent our close - too late to do anything more
+                break;
+            default:
+                // TODO Unknown state
+        }
+    }
 
-        _endpoint.setOnSaslComplete(new Runnable()
+    int sendFrame(final short channel, final FrameBody body, final QpidByteBuffer payload)
+    {
+        if (!_closedForOutput)
         {
-            public void run()
+            ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
+            int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+            QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
+            int payloadSent = _maxFrameSize - (size + 9);
+            try
             {
-                if (_endpoint.isAuthenticated())
+                if (payloadSent < (payload == null ? 0 : payload.remaining()))
                 {
-                    getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate()));
-                    getSender().flush();
+
+                    if (body instanceof Transfer)
+                    {
+                        ((Transfer) body).setMore(Boolean.TRUE);
+                    }
+
+                    writer = _describedTypeRegistry.getValueWriter(body);
+                    size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+                    payloadSent = _maxFrameSize - (size + 9);
+
+                    payloadDup.limit(payloadDup.position() + payloadSent);
                 }
                 else
                 {
-                    getNetwork().close();
+                    payloadSent = payload == null ? 0 : payload.remaining();
                 }
+                send(AMQFrame.createAMQFrame(channel, body, payloadDup));
             }
-        });
-        _frameHandler = new SASLFrameHandler(_endpoint);
-        headerResponse = SASL_LAYER_HEADER;
-        return headerResponse;
+            finally
+            {
+                if (payloadDup != null)
+                {
+                    payloadDup.dispose();
+                }
+            }
+            return payloadSent;
+        }
+        else
+        {
+            return -1;
+        }
+    }
+
+    void sendFrame(final short channel, final FrameBody body)
+    {
+        sendFrame(channel, body, null);
+    }
+
+    public ByteBufferSender getSender()
+    {
+        return getNetwork().getSender();
     }
 
     @Override
@@ -311,15 +1197,14 @@ public class AMQPConnection_1_0 extends
         return getNetwork().getRemoteAddress().toString();
     }
 
-    private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
 
 
-    public synchronized void received(final QpidByteBuffer msg)
+    public void received(final QpidByteBuffer msg)
     {
         try
         {
             updateLastReadTime();
-            if(RAW_LOGGER.isDebugEnabled())
+            if (RAW_LOGGER.isDebugEnabled())
             {
                 QpidByteBuffer dup = msg.duplicate();
                 byte[] data = new byte[dup.remaining()];
@@ -328,137 +1213,122 @@ public class AMQPConnection_1_0 extends
                 Binary bin = new Binary(data);
                 RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
             }
-            ProtocolHandler frameHandler;
+
             int remaining;
 
             do
             {
-                frameHandler = _frameHandler;
                 remaining = msg.remaining();
 
-                switch (_state)
+                switch (_frameReceivingState)
                 {
-                    case A:
-                        if (msg.hasRemaining())
-                        {
-                            msg.get();
-                        }
-                        else
-                        {
-                            break;
-                        }
-                    case M:
-                        if (msg.hasRemaining())
-                        {
-                            msg.get();
-                        }
-                        else
-                        {
-                            _state = State.M;
-                            break;
-                        }
-
-                    case Q:
-                        if (msg.hasRemaining())
-                        {
-                            msg.get();
-                        }
-                        else
-                        {
-                            _state = State.Q;
-                            break;
-                        }
-                    case P:
-                        if (msg.hasRemaining())
-                        {
-                            msg.get();
-                        }
-                        else
+                    case AMQP_OR_SASL_HEADER:
+                    case AMQP_HEADER:
+                        if (remaining < 8)
                         {
-                            _state = State.P;
-                            break;
+                            return;
                         }
-                    case PROTOCOL:
-                        if (msg.hasRemaining())
-                        {
-                            msg.get();
-                        }
-                        else
-                        {
-                            _state = State.PROTOCOL;
-                            break;
-                        }
-                    case MAJOR:
-                        if (msg.hasRemaining())
-                        {
-                            _major = msg.get();
-                        }
-                        else
-                        {
-                            _state = State.MAJOR;
-                            break;
-                        }
-                    case MINOR:
-                        if (msg.hasRemaining())
-                        {
-                            _minor = msg.get();
-                        }
-                        else
-                        {
-                            _state = State.MINOR;
-                            break;
-                        }
-                    case REVISION:
-                        if (msg.hasRemaining())
-                        {
-                            _revision = msg.get();
+                        processProtocolHeader(msg);
+                        break;
+                    case OPEN_ONLY:
+                    case ANY_FRAME:
+                    case SASL_INIT_ONLY:
+                    case SASL_RESPONSE_ONLY:
+                        _frameHandler.parse(msg);
+                        break;
+                    case CLOSED:
+                        // ignore;
+                        break;
+                }
 
-                            _state = State.FRAME;
-                        }
-                        else
-                        {
-                            _state = State.REVISION;
-                            break;
-                        }
-                    case FRAME:
-                        if (msg.hasRemaining())
-                        {
-                            AccessController.doPrivileged(new PrivilegedAction<Void>()
-                            {
-                                @Override
-                                public Void run()
-                                {
-                                    _frameHandler = _frameHandler.parse(msg);
-                                    return null;
-                                }
-                            }, getAccessControllerContext());
 
-                        }
-                }
             }
-            while(_frameHandler != frameHandler || msg.remaining() != remaining);
+            while (msg.remaining() != remaining);
         }
-        catch(ConnectionScopedRuntimeException e)
+        catch (ConnectionScopedRuntimeException e)
         {
             throw e;
         }
-        catch(RuntimeException e)
+        catch (RuntimeException e)
         {
             LOGGER.error("Unexpected exception while processing incoming data", e);
             throw new ConnectionScopedRuntimeException("Unexpected exception while processing incoming data", e);
         }
-        finally
+
+    }
+
+    private void processProtocolHeader(final QpidByteBuffer msg)
+    {
+        if(msg.remaining() >= 8)
         {
-            msg.position(msg.limit());
+            byte[] header = new byte[8];
+            msg.get(header);
+
+            final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider();
+            final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure());
+
+            if(Arrays.equals(header, SASL_HEADER))
+            {
+                if(_saslComplete)
+                {
+                    throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
+                }
+
+                getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
+
+                SaslMechanisms mechanisms = new SaslMechanisms();
+                ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
+                for (String name :  subjectCreator.getMechanisms())
+                {
+                    mechanismsList.add(Symbol.valueOf(name));
+                }
+                mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
+                send(new SASLFrame(mechanisms), null);
+
+                _frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
+                _frameHandler = new FrameHandler(this, true);
+            }
+            else if(Arrays.equals(header, AMQP_HEADER))
+            {
+                if(!_saslComplete)
+                {
+                    final List<String> mechanisms = subjectCreator.getMechanisms();
+
+                    if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
+                    {
+                        setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+                    }
+                    else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+                    {
+                        setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+                    }
+                    else
+                    {
+                        // TODO - log auth failure / close
+                        getNetwork().close();
+                    }
+
+                }
+                getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
+                _frameReceivingState = FrameReceivingState.OPEN_ONLY;
+                _frameHandler = new FrameHandler(this, false);
+
+            }
+            else
+            {
+                throw new ConnectionScopedRuntimeException("Unknown protocol header");
+            }
+
         }
-     }
+
+    }
 
 
     public void closed()
     {
         try
         {
-            _endpoint.inputClosed();
+            inputClosed();
         }
         catch(RuntimeException e)
         {
@@ -468,7 +1338,7 @@ public class AMQPConnection_1_0 extends
         {
             try
             {
-                _connection.closed();
+                endpointClosed();
             }
             finally
             {
@@ -487,57 +1357,49 @@ public class AMQPConnection_1_0 extends
         send(amqFrame, null);
     }
 
-    private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
 
 
     public void send(final AMQFrame amqFrame, ByteBuffer buf)
     {
 
-        synchronized (_sendLock)
-        {
-            updateLastWriteTime();
-            if (FRAME_LOGGER.isDebugEnabled())
-            {
-                FRAME_LOGGER.debug("SEND["
-                                   + getNetwork().getRemoteAddress()
-                                   + "|"
-                                   + amqFrame.getChannel()
-                                   + "] : "
-                                   + amqFrame.getFrameBody());
-            }
+        updateLastWriteTime();
+        FRAME_LOGGER.debug("SEND[{}|{}] : {}",
+                           getNetwork().getRemoteAddress(),
+                           amqFrame.getChannel(),
+                           amqFrame.getFrameBody());
 
-            _frameWriter.setValue(amqFrame);
+        _frameWriter.setValue(amqFrame);
 
-            QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
+        QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
 
-            try
+        try
+        {
+            int size = _frameWriter.writeToBuffer(buffer);
+            if (size > getMaxFrameSize())
             {
-                int size = _frameWriter.writeToBuffer(buffer);
-                if (size > _endpoint.getMaxFrameSize())
-                {
-                    throw new OversizeFrameException(amqFrame, size);
-                }
-
-                buffer.flip();
+                throw new OversizeFrameException(amqFrame, size);
+            }
 
-                if (RAW_LOGGER.isDebugEnabled())
-                {
-                    QpidByteBuffer dup = buffer.duplicate();
-                    byte[] data = new byte[dup.remaining()];
-                    dup.get(data);
-                    dup.dispose();
-                    Binary bin = new Binary(data);
-                    RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
-                }
+            buffer.flip();
 
-                getSender().send(buffer);
-                getSender().flush();
-            }
-            finally
+            if (RAW_LOGGER.isDebugEnabled())
             {
-                buffer.dispose();
+                QpidByteBuffer dup = buffer.duplicate();
+                byte[] data = new byte[dup.remaining()];
+                dup.get(data);
+                dup.dispose();
+                Binary bin = new Binary(data);
+                RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
             }
+
+            getSender().send(buffer);
+
+        }
+        finally
+        {
+            buffer.dispose();
         }
+
     }
 
     public void send(short channel, FrameBody body)
@@ -565,7 +1427,7 @@ public class AMQPConnection_1_0 extends
         if(_transportBlockedForWriting != blocked)
         {
             _transportBlockedForWriting = blocked;
-            _connection.transportStateChanged();
+            transportStateChanged();
         }
 
     }
@@ -575,7 +1437,7 @@ public class AMQPConnection_1_0 extends
     {
         if (isIOThread())
         {
-            return _connection.processPendingIterator();
+            return new ProcessPendingIterator();
         }
         else
         {
@@ -620,44 +1482,202 @@ public class AMQPConnection_1_0 extends
 
     public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
     {
-        _connection.sendConnectionCloseAsync(cause, message);
+        Action<ConnectionHandler> action = new Action<ConnectionHandler>()
+        {
+            @Override
+            public void performAction(final ConnectionHandler object)
+            {
+                closeConnection();
+
+            }
+        };
+        addAsyncTask(action);
     }
 
     public void closeSessionAsync(final AMQSessionModel<?> session,
                                   final AMQConstant cause, final String message)
     {
-        _connection.closeSessionAsync((Session_1_0) session, cause, message);
+        closeSessionAsync((Session_1_0) session, cause, message);
     }
 
     public void block()
     {
-        _connection.block();
+        // TODO
     }
 
     public String getRemoteContainerName()
     {
-        return _connection.getRemoteContainerName();
+        return _remoteContainerId;
     }
 
     public List<Session_1_0> getSessionModels()
     {
-        return _connection.getSessionModels();
+        return new ArrayList<>(_sessions);
     }
 
     public void unblock()
     {
-        _connection.unblock();
+        // TODO
     }
 
     public long getSessionCountLimit()
     {
-        return _connection.getSessionCountLimit();
+        return 0;  // TODO
     }
 
     @Override
-    protected boolean isOrderlyClose()
+    public boolean isOrderlyClose()
     {
-        return _connection.getConnectionEndpoint().isOrderlyClose();
+        return _orderlyClose.get();
+    }
+
+    private void addAsyncTask(final Action<ConnectionHandler> action)
+    {
+        _asyncTaskList.add(action);
+        notifyWork();
+    }
+
+
+    private void sendOpen(final int channelMax, final int maxFrameSize)
+    {
+        Open open = new Open();
+
+        if (_receivingSessions == null)
+        {
+            _receivingSessions = new SessionEndpoint[channelMax + 1];
+            _sendingSessions = new SessionEndpoint[channelMax + 1];
+        }
+        if (channelMax < _channelMax)
+        {
+            _channelMax = channelMax;
+        }
+        open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
+        open.setContainerId(_container.getId());
+        open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+        // TODO - should we try to set the hostname based on the connection information?
+        // open.setHostname();
+        open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
+        if (_properties != null)
+        {
+            open.setProperties(_properties);
+        }
+
+        sendFrame(CONNECTION_CONTROL_CHANNEL, open);
+    }
+
+    private void closeWithError(final AmqpError amqpError, final String errorDescription)
+    {
+        final Error err = new Error();
+        err.setCondition(amqpError);
+        err.setDescription(errorDescription);
+        closeConnection(err);
+        close();
+        _closedOnOpen = true;
+    }
+
+    private SessionEndpoint getSession(final short channel)
+    {
+        SessionEndpoint session = _receivingSessions[channel];
+        if (session == null)
+        {
+            Error error = new Error();
+            error.setCondition(ConnectionError.FRAMING_ERROR);
+            error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+            handleError(error);
+        }
+
+        return session;
+    }
+
+    private void sendClose(Close closeToSend)
+    {
+        sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
+        closeSender();
+    }
+
+    @Override
+    public String toString()
+    {
+        VirtualHost<?> virtualHost = getVirtualHost();
+        return "Connection_1_0["
+               + _connectionId
+               + " "
+               + getAddress()
+               + (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
+               + ']';
+    }
+
+
+    private void assertState(final FrameReceivingState state)
+    {
+        if(_frameReceivingState != state)
+        {
+            throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order.  Required state: " + state + ", actual state: " + _frameReceivingState);
+        }
+    }
+
+
+    private class ProcessPendingIterator implements Iterator<Runnable>
+    {
+        private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+        private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+        private ProcessPendingIterator()
+        {
+            _sessionsWithPending = new ArrayList<>(getSessionModels());
+            _sessionIterator = _sessionsWithPending.iterator();
+        }
+
+        @Override
+        public boolean hasNext()
+        {
+            return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+        }
+
+        @Override
+        public Runnable next()
+        {
+            if(!_sessionsWithPending.isEmpty())
+            {
+                if(!_sessionIterator.hasNext())
+                {
+                    _sessionIterator = _sessionsWithPending.iterator();
+                }
+                final AMQSessionModel<?> session = _sessionIterator.next();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        if(!session.processPending())
+                        {
+                            _sessionIterator.remove();
+                        }
+                    }
+                };
+            }
+            else if(!_asyncTaskList.isEmpty())
+            {
+                final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
+                return new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        asyncAction.performAction(AMQPConnection_1_0.this);
+                    }
+                };
+            }
+            else
+            {
+                throw new NoSuchElementException();
+            }
+        }
+
+        @Override
+        public void remove()
+        {
+            throw new UnsupportedOperationException();
+        }
     }
 
     @Override

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java?rev=1739270&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java Fri Apr 15 10:10:16 2016
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public interface ConnectionHandler extends SASLEndpoint
+{
+    void receiveOpen(short channel, Open close);
+
+    void receiveClose(short channel, Close close);
+
+    void receiveBegin(short channel, Begin begin);
+
+    void receiveEnd(short channel, End end);
+
+    void receiveAttach(short channel, Attach attach);
+
+    void receiveDetach(short channel, Detach detach);
+
+    void receiveTransfer(short channel, Transfer transfer);
+
+    void receiveDisposition(short channel, Disposition disposition);
+
+    void receiveFlow(short channel, Flow flow);
+
+    int getMaxFrameSize();
+
+    void handleError(Error parsingError);
+
+    boolean closedForInput();
+
+    void receive(short channel, Object val);
+}

Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java Fri Apr 15 10:10:16 2016
@@ -19,7 +19,7 @@
  *
  */
 
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
 
 public enum ConnectionState
 {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Apr 15 10:10:16 2016
@@ -26,23 +26,22 @@ import java.util.Collection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -130,21 +129,21 @@ class ConsumerTarget_1_0 extends Abstrac
             QpidByteBuffer payload = null;
             //TODO
             Collection<QpidByteBuffer> fragments = message.getFragments();
-            if(fragments.size() == 1)
+            if (fragments.size() == 1)
             {
                 payload = fragments.iterator().next();
             }
             else
             {
                 int size = 0;
-                for(QpidByteBuffer fragment : fragments)
+                for (QpidByteBuffer fragment : fragments)
                 {
                     size += fragment.remaining();
                 }
 
                 payload = QpidByteBuffer.allocateDirect(size);
 
-                for(QpidByteBuffer fragment : fragments)
+                for (QpidByteBuffer fragment : fragments)
                 {
                     payload.put(fragment);
                     fragment.dispose();
@@ -153,7 +152,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 payload.flip();
             }
 
-            if(entry.getDeliveryCount() != 0)
+            if (entry.getDeliveryCount() != 0)
             {
                 ValueHandler valueHandler = new ValueHandler(_typeRegistry);
 
@@ -161,7 +160,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 try
                 {
                     Object value = valueHandler.parse(payload);
-                    if(value instanceof Header)
+                    if (value instanceof Header)
                     {
                         oldHeader = (Header) value;
                     }
@@ -177,7 +176,7 @@ class ConsumerTarget_1_0 extends Abstrac
                 }
 
                 Header header = new Header();
-                if(oldHeader != null)
+                if (oldHeader != null)
                 {
                     header.setDurable(oldHeader.getDurable());
                     header.setPriority(oldHeader.getPriority());
@@ -190,7 +189,7 @@ class ConsumerTarget_1_0 extends Abstrac
 
                 QpidByteBuffer oldPayload = payload;
                 payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
-                payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+                payload.put(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength());
                 payload.put(oldPayload);
                 oldPayload.dispose();
                 payload.flip();
@@ -203,58 +202,57 @@ class ConsumerTarget_1_0 extends Abstrac
 
             transfer.setDeliveryTag(tag);
 
-            synchronized(_link.getLock())
+            if (_link.isAttached())
             {
-                if(_link.isAttached())
+                if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
                 {
-                    if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
-                    {
-                        transfer.setSettled(true);
-                    }
-                    else
-                    {
-                        UnsettledAction action = _acquires
-                                                 ? new DispositionAction(tag, entry)
-                                                 : new DoNothingAction(tag, entry);
+                    transfer.setSettled(true);
+                }
+                else
+                {
+                    UnsettledAction action = _acquires
+                            ? new DispositionAction(tag, entry)
+                            : new DoNothingAction(tag, entry);
 
-                        _link.addUnsettled(tag, action, entry);
-                    }
+                    _link.addUnsettled(tag, action, entry);
+                }
 
-                    if(_transactionId != null)
-                    {
-                        TransactionalState state = new TransactionalState();
-                        state.setTxnId(_transactionId);
-                        transfer.setState(state);
-                    }
-                    // TODO - need to deal with failure here
-                    if(_acquires && _transactionId != null)
+                if (_transactionId != null)
+                {
+                    TransactionalState state = new TransactionalState();
+                    state.setTxnId(_transactionId);
+                    transfer.setState(state);
+                }
+                // TODO - need to deal with failure here
+                if (_acquires && _transactionId != null)
+                {
+                    ServerTransaction txn = _link.getTransaction(_transactionId);
+                    if (txn != null)
                     {
-                        ServerTransaction txn = _link.getTransaction(_transactionId);
-                        if(txn != null)
+                        txn.addPostTransactionAction(new ServerTransaction.Action()
                         {
-                            txn.addPostTransactionAction(new ServerTransaction.Action(){
 
-                                public void postCommit()
-                                {
-                                }
-
-                                public void onRollback()
-                                {
-                                    entry.release(getConsumer());
-                                    _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
-                                }
-                            });
-                        }
+                            public void postCommit()
+                            {
+                            }
 
+                            public void onRollback()
+                            {
+                                entry.release(getConsumer());
+                                _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
+                            }
+                        });
                     }
-                    getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
-                    getEndpoint().transfer(transfer, false);
-                }
-                else
-                {
-                    entry.release(getConsumer());
+
                 }
+                getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
+                getEndpoint().transfer(transfer, false);
             }
+            else
+            {
+                entry.release(getConsumer());
+            }
+
         }
         finally
         {
@@ -281,63 +279,48 @@ class ConsumerTarget_1_0 extends Abstrac
 
     public boolean allocateCredit(final ServerMessage msg)
     {
-        synchronized (_link.getLock())
+        ProtocolEngine protocolEngine = getSession().getConnection();
+        final boolean hasCredit =
+                _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
+        if (!hasCredit && getState() == State.ACTIVE)
         {
+            suspend();
+        }
 
-            ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection();
-            final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
-            if (!hasCredit && getState() == State.ACTIVE)
-            {
-                suspend();
-            }
-
-            if (hasCredit)
-            {
-                SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
-                linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
-            }
-
-            return hasCredit;
+        if (hasCredit)
+        {
+            SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
+            linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
         }
+
+        return hasCredit;
     }
 
 
     public void suspend()
     {
-        synchronized(_link.getLock())
-        {
-            updateState(State.ACTIVE, State.SUSPENDED);
-        }
+        updateState(State.ACTIVE, State.SUSPENDED);
     }
 
 
     public void restoreCredit(final ServerMessage message)
     {
-        synchronized (_link.getLock())
-        {
-            final SendingLinkEndpoint endpoint = _link.getEndpoint();
-            endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
-        }
+        final SendingLinkEndpoint endpoint = _link.getEndpoint();
+        endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
     }
 
     public void queueEmpty()
     {
-        synchronized(_link.getLock())
-        {
-            _queueEmpty = true;
-        }
+        _queueEmpty = true;
     }
 
     public void flowStateChanged()
     {
-        synchronized(_link.getLock())
+        ProtocolEngine protocolEngine = getSession().getConnection();
+        if (isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
         {
-            ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection();
-            if(isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
-            {
-                updateState(State.SUSPENDED, State.ACTIVE);
-                _transactionId = _link.getTransactionId();
-            }
+            updateState(State.SUSPENDED, State.ACTIVE);
+            _transactionId = _link.getTransactionId();
         }
     }
 
@@ -552,16 +535,13 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     protected void processStateChanged()
     {
-        synchronized (_link.getLock())
+        if(_queueEmpty)
         {
-            if(_queueEmpty)
-            {
-                _queueEmpty = false;
+            _queueEmpty = false;
 
-                if(_link.drained())
-                {
-                    updateState(State.ACTIVE, State.SUSPENDED);
-                }
+            if(_link.drained())
+            {
+                updateState(State.ACTIVE, State.SUSPENDED);
             }
         }
     }
@@ -569,10 +549,7 @@ class ConsumerTarget_1_0 extends Abstrac
     @Override
     protected boolean hasStateChanged()
     {
-        synchronized (_link.getLock())
-        {
-            return _queueEmpty;
-        }
+        return _queueEmpty;
     }
 
     @Override

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java Fri Apr 15 10:10:16 2016
@@ -19,7 +19,7 @@
  *
  */
 
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
 
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Fri Apr 15 10:10:16 2016
@@ -17,14 +17,11 @@
  * under the License.
  */
 
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class Delivery
 {

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java Fri Apr 15 10:10:16 2016
@@ -17,10 +17,10 @@
  * under the License.
  */
 
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
 
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 
 public interface DeliveryStateHandler
 {

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java Fri Apr 15 10:10:16 2016
@@ -18,10 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
 
 
 public interface ErrorHandler
 {
-    void handleError(org.apache.qpid.amqp_1_0.type.transport.Error error);
+    void handleError(org.apache.qpid.server.protocol.v1_0.type.transport.Error error);
 }




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org