You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/04/15 12:10:20 UTC
svn commit: r1739270 [2/8] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/pro...
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Apr 15 10:10:16 2016
@@ -25,11 +25,18 @@ import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,20 +46,21 @@ import javax.security.sasl.SaslServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.amqp_1_0.codec.FrameWriter;
-import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
-import org.apache.qpid.amqp_1_0.framing.AMQFrame;
-import org.apache.qpid.amqp_1_0.framing.FrameHandler;
-import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
-import org.apache.qpid.amqp_1_0.framing.TransportFrame;
-import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
-import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
-import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.FrameBody;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
@@ -64,6 +72,27 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -74,191 +103,1048 @@ import org.apache.qpid.server.transport.
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0>
- implements FrameOutputHandler
+ implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source,
+ ValueWriter.Registry.Source,
+ ErrorHandler,
+ SASLEndpoint,
+ ConnectionHandler
{
- public static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
+ private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class);
+ private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
+ private static final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
+
+
+
+ private static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+
+ private final AtomicBoolean _stateChanged = new AtomicBoolean();
+ private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+
+
+ private static final byte[] SASL_HEADER = new byte[]
+ {
+ (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
+ private static final byte[] AMQP_HEADER = new byte[]
+ {
+ (byte) 'A',
+ (byte) 'M',
+ (byte) 'Q',
+ (byte) 'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ };
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private volatile boolean _transportBlockedForWriting;
+
+ private enum FrameReceivingState
+ {
+ AMQP_OR_SASL_HEADER,
+ SASL_INIT_ONLY,
+ SASL_RESPONSE_ONLY,
+ AMQP_HEADER,
+ OPEN_ONLY,
+ ANY_FRAME,
+ CLOSED
+ }
+
+ private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER;
+
+ private static final short CONNECTION_CONTROL_CHANNEL = (short) 0;
+ private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]);
+
+ private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF);
+ private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15);
+
+ private AmqpPort<?> _port;
+ private SubjectCreator _subjectCreator;
+ private Transport _transport;
+ private long _connectionId;
+
+ private Container _container;
+ private Principal _user;
+
+
+ private int _channelMax = DEFAULT_CHANNEL_MAX;
+ private int _maxFrameSize = 4096;
+ private String _remoteContainerId;
+
+ private SocketAddress _remoteAddress;
+
+ // positioned by the *outgoing* channel
+ private SessionEndpoint[] _sendingSessions;
+
+ // positioned by the *incoming* channel
+ private SessionEndpoint[] _receivingSessions;
+ private boolean _closedForInput;
+ private boolean _closedForOutput;
+
+ private long _idleTimeout;
+
+ private ConnectionState _connectionState = ConnectionState.UNOPENED;
+
+ private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+
+
+ private Map _properties;
+ private SaslServerProvider _saslServerProvider;
+ private boolean _saslComplete;
+
+ private SaslServer _saslServer;
+ private String _localHostname;
+ private long _desiredIdleTimeout;
+ private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE;
+ private Error _remoteError;
+
+ private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
+
+ private Map _remoteProperties;
+
+ private final AtomicBoolean _orderlyClose = new AtomicBoolean(false);
+
+ private final Collection<Session_1_0>
+ _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
+
+ private final Object _reference = new Object();
+
+ private final Queue<Action<? super ConnectionHandler>> _asyncTaskList =
+ new ConcurrentLinkedQueue<>();
+
+ private boolean _closedOnOpen;
+
+
+
+ AMQPConnection_1_0(final Broker<?> broker,
+ final ServerNetworkConnection network,
+ AmqpPort<?> port, Transport transport, long id,
+ final AggregateTicker aggregateTicker,
+ final boolean useSASL)
+ {
+ super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
+ _container = new Container(broker.getId().toString());
+
+ _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
+
+ _saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null;
+ _port = port;
+ _transport = transport;
+ _connectionId = id;
+
+ Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
+ serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
+
+ setProperties(serverProperties);
+
+ setRemoteAddress(network.getRemoteAddress());
+
+ setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
+
+ _frameWriter = new FrameWriter(getDescribedTypeRegistry());
+
+
+ }
+
+
+ private void setUserPrincipal(final Principal user)
+ {
+ setSubject(_subjectCreator.createSubjectWithGroups(user));
+ }
+
+ private long getDesiredIdleTimeout()
+ {
+ return _desiredIdleTimeout;
+ }
+
+ public void receiveAttach(final short channel, final Attach attach)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endPoint = getSession(channel);
+ if (endPoint != null)
+ {
+ endPoint.receiveAttach(attach);
+ }
+ }
+
+ public void receive(final short channel, final Object frame)
+ {
+ List<Runnable> postLockActions;
+ FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame);
+ if (frame instanceof FrameBody)
+ {
+ ((FrameBody) frame).invoke(channel, this);
+ }
+ else if (frame instanceof SaslFrameBody)
+ {
+ ((SaslFrameBody) frame).invoke(channel, this);
+ }
+ }
+
+ private void closeSaslWithFailure()
+ {
+ _saslComplete = true;
+ _frameReceivingState = FrameReceivingState.CLOSED;
+ setClosedForInput(true);
+ close();
+ }
+
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+ // TODO - log unexpected frame
+ closeSaslWithFailure();
+ }
+
+ public void receiveClose(final short channel, final Close close)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ _frameReceivingState = FrameReceivingState.CLOSED;
+ setClosedForInput(true);
+ closeReceived();
+ switch (_connectionState)
+ {
+ case UNOPENED:
+ case AWAITING_OPEN:
+ Error error = new Error();
+ error.setCondition(ConnectionError.CONNECTION_FORCED);
+ error.setDescription("Connection close sent before connection was opened");
+ closeConnection(error);
+ break;
+ case OPEN:
+ _connectionState = ConnectionState.CLOSE_RECEIVED;
+ // TODO - we should log the error we received from the client if present
+ sendClose(new Close());
+ _connectionState = ConnectionState.CLOSED;
+ _orderlyClose.set(true);
+ break;
+ case CLOSE_SENT:
+ _connectionState = ConnectionState.CLOSED;
+ _orderlyClose.set(true);
+
+ default:
+ }
+ _remoteError = close.getError();
+
+ }
+
+ private void closeReceived()
+ {
+ Collection<Session_1_0> sessions = new ArrayList<>(_sessions);
+
+ for(Session_1_0 session : sessions)
+ {
+ session.remoteEnd(new End());
+ }
+ }
+
+ private void setClosedForInput(final boolean closed)
+ {
+ _closedForInput = closed;
+ }
+
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+ // TODO - log unexpected frame
+ closeSaslWithFailure();
+ }
+
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+ final Binary responseBinary = saslResponse.getResponse();
+ byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray();
+
+ assertState(FrameReceivingState.SASL_RESPONSE_ONLY);
+
+ try
+ {
+
+ // Process response from the client
+ byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
+
+ if (_saslServer.isComplete())
+ {
+ SaslOutcome outcome = new SaslOutcome();
+
+ outcome.setCode(SaslCode.OK);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
+ _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
+ _frameReceivingState = FrameReceivingState.AMQP_HEADER;
+ }
+ else
+ {
+ SaslChallenge challengeBody = new SaslChallenge();
+ challengeBody.setChallenge(new Binary(challenge));
+ send(new SASLFrame(challengeBody), null);
+
+ }
+ }
+ catch (SaslException e)
+ {
+ SaslOutcome outcome = new SaslOutcome();
+
+ outcome.setCode(SaslCode.AUTH);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
+ closeSaslWithFailure();
+
+ }
+ }
+
+ public AMQPDescribedTypeRegistry getDescribedTypeRegistry()
+ {
+ return _describedTypeRegistry;
+ }
+
+ private void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message)
+ {
+ addAsyncTask(new Action<ConnectionHandler>()
+ {
+ @Override
+ public void performAction(final ConnectionHandler object)
+ {
+ session.close(cause, message);
+ }
+ });
+ }
+
+
+ private boolean closedForOutput()
+ {
+ return _closedForOutput;
+ }
+
+ public boolean isClosed()
+ {
+ return _connectionState == ConnectionState.CLOSED
+ || _connectionState == ConnectionState.CLOSE_RECEIVED;
+ }
+
+ public boolean closedForInput()
+ {
+ return _closedForInput;
+ }
+
+ void sessionEnded(final Session_1_0 session)
+ {
+ if(!_closedOnOpen)
+ {
+ _sessions.remove(session);
+ sessionRemoved(session);
+ }
+ }
+
+ public int send(final short channel, final FrameBody body, final QpidByteBuffer payload)
+ {
+ return sendFrame(channel, body, payload);
+ }
+
+ private void inputClosed()
+ {
+ List<Runnable> postLockActions;
+
+ if (!_closedForInput)
+ {
+ _closedForInput = true;
+ FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed");
+ switch (_connectionState)
+ {
+ case UNOPENED:
+ case AWAITING_OPEN:
+ case CLOSE_SENT:
+ _connectionState = ConnectionState.CLOSED;
+ closeSender();
+ break;
+ case OPEN:
+ _connectionState = ConnectionState.CLOSE_RECEIVED;
+ case CLOSED:
+ // already sent our close - too late to do anything more
+ break;
+ default:
+ }
+ closeReceived();
+ }
+
+
+ }
+
+ private void closeSender()
+ {
+ setClosedForOutput(true);
+ close();
+ }
+
+ String getRemoteContainerId()
+ {
+ return _remoteContainerId;
+ }
- public static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
- private final Broker<?> _broker;
+ private void setDesiredIdleTimeout(final long desiredIdleTimeout)
+ {
+ _desiredIdleTimeout = desiredIdleTimeout;
+ }
+
+ public boolean isOpen()
+ {
+ return _connectionState == ConnectionState.OPEN;
+ }
+
+ void sendEnd(final short channel, final End end, final boolean remove)
+ {
+ sendFrame(channel, end);
+ if (remove)
+ {
+ _sendingSessions[channel] = null;
+ }
+
+ }
+
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+ // TODO - log unexpected frame
+ closeSaslWithFailure();
+ }
+
+ public void receiveEnd(final short channel, final End end)
+ {
+
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endpoint = _receivingSessions[channel];
+ if (endpoint != null)
+ {
+ _receivingSessions[channel] = null;
+
+ endpoint.receiveEnd(end);
+ }
+ else
+ {
+ // TODO error
+ }
+ }
+
+ public void receiveDisposition(final short channel,
+ final Disposition disposition)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endPoint = getSession(channel);
+ if (endPoint != null)
+ {
+ endPoint.receiveDisposition(disposition);
+ }
+
+ }
+
+ public void receiveBegin(final short channel, final Begin begin)
+ {
+
+ assertState(FrameReceivingState.ANY_FRAME);
+ short myChannelId;
+ if (begin.getRemoteChannel() != null)
+ {
+ myChannelId = begin.getRemoteChannel().shortValue();
+ SessionEndpoint sessionEndpoint;
+ try
+ {
+ sessionEndpoint = _sendingSessions[myChannelId];
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ + begin.getRemoteChannel() + " which is outside the valid range of 0 to "
+ + _channelMax + ".");
+ closeConnection(error);
+ return;
+ }
+ if (sessionEndpoint != null)
+ {
+ if (_receivingSessions[channel] == null)
+ {
+ _receivingSessions[channel] = sessionEndpoint;
+ sessionEndpoint.setReceivingChannel(channel);
+ sessionEndpoint.setNextIncomingId(begin.getNextOutgoingId());
+ sessionEndpoint.setOutgoingSessionCredit(begin.getIncomingWindow());
+
+ if (sessionEndpoint.getState() == SessionState.END_SENT)
+ {
+ _sendingSessions[myChannelId] = null;
+ }
+ }
+ else
+ {
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
+ closeConnection(error);
+ }
+ }
+ else
+ {
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + " with given remote-channel "
+ + begin.getRemoteChannel() + " which is not known as a begun session.");
+ closeConnection(error);
+ }
+
+
+ }
+ else // Peer requesting session creation
+ {
+
+ myChannelId = getFirstFreeChannel();
+ if (myChannelId == -1)
+ {
+ // close any half open channel
+ myChannelId = getFirstFreeChannel();
+
+ }
+
+ if (_receivingSessions[channel] == null)
+ {
+ SessionEndpoint sessionEndpoint = new SessionEndpoint(this, begin);
+
+ _receivingSessions[channel] = sessionEndpoint;
+ _sendingSessions[myChannelId] = sessionEndpoint;
+
+ Begin beginToSend = new Begin();
+
+ sessionEndpoint.setReceivingChannel(channel);
+ sessionEndpoint.setSendingChannel(myChannelId);
+ beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel));
+ beginToSend.setNextOutgoingId(sessionEndpoint.getNextOutgoingId());
+ beginToSend.setOutgoingWindow(sessionEndpoint.getOutgoingWindowSize());
+ beginToSend.setIncomingWindow(sessionEndpoint.getIncomingWindowSize());
+ sendFrame(myChannelId, beginToSend);
+
+ remoteSessionCreation(sessionEndpoint);
+ }
+ else
+ {
+ final Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("BEGIN received on channel " + channel + " which is already in use.");
+ closeConnection(error);
+ }
+
+ }
+
+ }
+
+ private void remoteSessionCreation(final SessionEndpoint sessionEndpoint)
+ {
+ if(!_closedOnOpen)
+ {
+ final Session_1_0 session = new Session_1_0(this, sessionEndpoint);
+ _sessions.add(session);
+ sessionAdded(session);
+ sessionEndpoint.setSessionEventListener(new SessionEventListener()
+ {
+ @Override
+ public void remoteLinkCreation(final LinkEndpoint endpoint11)
+ {
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteLinkCreation(endpoint11);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+
+ @Override
+ public void remoteEnd(final End end)
+ {
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ session.remoteEnd(end);
+ return null;
+ }
+ }, session.getAccessControllerContext());
+ }
+ });
+ }
+ }
+
+ private short getFirstFreeChannel()
+ {
+ for (int i = 0; i <= _channelMax; i++)
+ {
+ if (_sendingSessions[i] == null)
+ {
+ return (short) i;
+ }
+ }
+ return -1;
+ }
+
+ public void handleError(final Error error)
+ {
+ if (!closedForOutput())
+ {
+ Close close = new Close();
+ close.setError(error);
+ sendFrame((short) 0, close);
+
+ setClosedForOutput(true);
+ }
+
+ }
+
+ public void receiveTransfer(final short channel, final Transfer transfer)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endPoint = getSession(channel);
+ if (endPoint != null)
+ {
+ endPoint.receiveTransfer(transfer);
+ }
+
+ }
+
+ public SessionEndpoint createSession(final String name)
+ {
+ // todo assert connection state
+ short channel = getFirstFreeChannel();
+ if (channel != -1)
+ {
+ SessionEndpoint endpoint = new SessionEndpoint(this);
+ _sendingSessions[channel] = endpoint;
+ endpoint.setSendingChannel(channel);
+ Begin begin = new Begin();
+ begin.setNextOutgoingId(endpoint.getNextOutgoingId());
+ begin.setOutgoingWindow(endpoint.getOutgoingWindowSize());
+ begin.setIncomingWindow(endpoint.getIncomingWindowSize());
+
+ begin.setHandleMax(_handleMax);
+ sendFrame(channel, begin);
+ return endpoint;
+
+ }
+ else
+ {
+ // TODO - report error
+ return null;
+ }
+
+ }
+
+ public void receiveFlow(final short channel, final Flow flow)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endPoint = getSession(channel);
+ if (endPoint != null)
+ {
+ endPoint.receiveFlow(flow);
+ }
+
+ }
+
+ public void receiveOpen(final short channel, final Open open)
+ {
+ assertState(FrameReceivingState.OPEN_ONLY);
+ _frameReceivingState = FrameReceivingState.ANY_FRAME;
+ _channelMax = open.getChannelMax() == null ? _channelMax
+ : open.getChannelMax().intValue() < _channelMax
+ ? open.getChannelMax().intValue()
+ : _channelMax;
+ if (_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[_channelMax + 1];
+ _sendingSessions = new SessionEndpoint[_channelMax + 1];
+ }
+ _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue();
+ _remoteContainerId = open.getContainerId();
+ _localHostname = open.getHostname();
+ if (open.getIdleTimeOut() != null)
+ {
+ _idleTimeout = open.getIdleTimeOut().longValue();
+ }
+ _remoteProperties = open.getProperties();
+ if (_remoteProperties != null)
+ {
+ if (_remoteProperties.containsKey(Symbol.valueOf("product")))
+ {
+ setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString());
+ }
+ if (_remoteProperties.containsKey(Symbol.valueOf("version")))
+ {
+ setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString());
+ }
+ setClientId(_remoteContainerId);
+ }
+ if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
+ {
+ closeConnection(new Error(ConnectionError.CONNECTION_FORCED,
+ "Requested idle timeout of "
+ + _idleTimeout
+ + " is too low. The minimum supported timeout is"
+ + MINIMUM_SUPPORTED_IDLE_TIMEOUT));
+ close();
+ _closedOnOpen = true;
+ }
+ else
+ {
+ long desiredIdleTimeout = getDesiredIdleTimeout();
+ initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout);
+ final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(_localHostname);
+ if (vhost == null)
+ {
+ closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'");
+ }
+ else
+ {
+ if (vhost.getState() != org.apache.qpid.server.model.State.ACTIVE)
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_FOUND);
+ closeConnection(err);
+
+ _closedOnOpen = true;
+
+ populateConnectionRedirect(vhost, err);
+
+ closeConnection(err);
+
+ close();
+
+ _closedOnOpen = true;
+
+ }
+ else
+ {
+ final Principal user = _user;
+ if (user != null)
+ {
+ setUserPrincipal(user);
+ }
+ if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null)
+ {
+ closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated");
+ }
+ else
+ {
+ try
+ {
+ setVirtualHost(vhost);
+ }
+ catch (VirtualHostUnavailableException e)
+ {
+ closeWithError(AmqpError.NOT_ALLOWED, e.getMessage());
+ }
+ }
+ }
+ }
+ }
+ switch (_connectionState)
+ {
+ case UNOPENED:
+ sendOpen(_channelMax, _maxFrameSize);
+ case AWAITING_OPEN:
+ _connectionState = ConnectionState.OPEN;
+ default:
+ // TODO bad stuff (connection already open)
+
+ }
+
+ }
+
+ private void populateConnectionRedirect(final VirtualHost vhost, final Error err)
+ {
+ final String redirectHost = vhost.getRedirectHost(((AmqpPort) _port));
+
+ if(redirectHost == null)
+ {
+ err.setDescription("Virtual host '" + _localHostname + "' is not active");
+ }
+ else
+ {
+ String networkHost;
+ int port;
+ if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?"))
+ {
+ // IPv6 case
+ networkHost = redirectHost.substring(1, redirectHost.indexOf("]"));
+ if(redirectHost.contains("]:"))
+ {
+ port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2));
+ }
+ else
+ {
+ port = -1;
+ }
+ }
+ else
+ {
+ if(redirectHost.contains(":"))
+ {
+ networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":"));
+ try
+ {
+ String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1);
+ port = Integer.parseInt(portString);
+ }
+ catch (NumberFormatException e)
+ {
+ port = -1;
+ }
+ }
+ else
+ {
+ networkHost = redirectHost;
+ port = -1;
+ }
+ }
+ final Map<Symbol, Object> infoMap = new HashMap<>();
+ infoMap.put(Symbol.valueOf("network-host"), networkHost);
+ if(port > 0)
+ {
+ infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port));
+ }
+ err.setInfo(infoMap);
+ }
+ }
- private ConnectionEndpoint _endpoint;
- private final AtomicBoolean _stateChanged = new AtomicBoolean();
- private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>();
+ public void receiveDetach(final short channel, final Detach detach)
+ {
+ assertState(FrameReceivingState.ANY_FRAME);
+ SessionEndpoint endPoint = getSession(channel);
+ if (endPoint != null)
+ {
+ endPoint.receiveDetach(detach);
+ }
+ }
+ private void transportStateChanged()
+ {
+ for (Session_1_0 session : _sessions)
+ {
+ session.transportStateChanged();
+ }
+ }
- private static final ByteBuffer SASL_LAYER_HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private static final ByteBuffer AMQP_LAYER_HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
+ public void close(final Error error)
+ {
+ closeConnection(error);
+ }
+ private void setRemoteAddress(final SocketAddress remoteAddress)
+ {
+ _remoteAddress = remoteAddress;
+ }
- private FrameWriter _frameWriter;
- private ProtocolHandler _frameHandler;
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
- private final Connection_1_0 _connection;
- private volatile boolean _transportBlockedForWriting;
+ public Principal getUser()
+ {
+ return _user;
+ }
+ public void setProperties(final Map<Symbol, Object> properties)
+ {
+ _properties = properties;
+ }
- static enum State {
- A,
- M,
- Q,
- P,
- PROTOCOL,
- MAJOR,
- MINOR,
- REVISION,
- FRAME
- }
-
- private State _state = State.A;
-
- public AMQPConnection_1_0(final Broker<?> broker, final ServerNetworkConnection network,
- AmqpPort<?> port, Transport transport, long id,
- final AggregateTicker aggregateTicker,
- final boolean useSASL)
+ private void setClosedForOutput(final boolean closed)
{
- super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker);
- _broker = broker;
- _connection = createConnection(broker, network, port, transport, id, useSASL);
- _endpoint = _connection.getConnectionEndpoint();
- _endpoint.setConnectionEventListener(_connection);
- _endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay());
- _endpoint.setFrameOutputHandler(this);
- final List<String> mechanisms = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms();
- ByteBuffer headerResponse = useSASL ? initiateSasl() : initiateNonSasl(mechanisms);
+ _closedForOutput = closed;
+ }
- _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry());
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+ assertState(FrameReceivingState.SASL_INIT_ONLY);
+ String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString();
+ final Binary initialResponse = saslInit.getInitialResponse();
+ byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray();
- getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate()));
- getSender().flush();
- if(useSASL)
+ try
{
- _endpoint.initiateSASL(mechanisms.toArray(new String[mechanisms.size()]));
- }
+ _saslServer = _saslServerProvider.getSaslServer(mechanism, "localhost");
+ // Process response from the client
+ byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]);
- }
+ if (_saslServer.isComplete())
+ {
+ SaslOutcome outcome = new SaslOutcome();
- private Connection_1_0 createConnection(final Broker<?> broker,
- final ServerNetworkConnection network,
- final AmqpPort<?> port,
- final Transport transport,
- final long id,
- final boolean useSASL)
- {
- Container container = new Container(broker.getId().toString());
+ outcome.setCode(SaslCode.OK);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
+ _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer);
- SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure());
- final ConnectionEndpoint endpoint =
- new ConnectionEndpoint(container, useSASL ? asSaslServerProvider(subjectCreator, network) : null);
- endpoint.setLogger(new ConnectionEndpoint.FrameReceiptLogger()
- {
- @Override
- public boolean isEnabled()
- {
- return FRAME_LOGGER.isDebugEnabled();
- }
+ _frameReceivingState = FrameReceivingState.AMQP_HEADER;
- @Override
- public void received(final SocketAddress remoteAddress, final short channel, final Object frame)
+ }
+ else
{
- FRAME_LOGGER.debug("RECV[" + remoteAddress + "|" + channel + "] : " + frame);
+ SaslChallenge challengeBody = new SaslChallenge();
+ challengeBody.setChallenge(new Binary(challenge));
+ send(new SASLFrame(challengeBody), null);
+
+ _frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY;
}
- });
- Map<Symbol,Object> serverProperties = new LinkedHashMap<>();
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion());
- serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName());
+ }
+ catch (SaslException e)
+ {
+ SaslOutcome outcome = new SaslOutcome();
- endpoint.setProperties(serverProperties);
+ outcome.setCode(SaslCode.AUTH);
+ send(new SASLFrame(outcome), null);
+ _saslComplete = true;
- endpoint.setRemoteAddress(network.getRemoteAddress());
- return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this);
+ closeSaslWithFailure();
+
+ }
}
+ public int getMaxFrameSize()
+ {
+ return _maxFrameSize;
+ }
- public ByteBufferSender getSender()
+ Object getReference()
{
- return getNetwork().getSender();
+ return _reference;
}
- public ByteBuffer initiateNonSasl(final List<String> mechanisms)
+ private void endpointClosed()
{
- final ByteBuffer headerResponse;
- if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME)
- && getNetwork().getPeerPrincipal() != null)
+ try
{
- _connection.setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+ performDeleteTasks();
+ closeReceived();
}
- else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+ finally
{
- _connection.setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+ VirtualHost<?> virtualHost = getVirtualHost();
+ if (virtualHost != null)
+ {
+ virtualHost.deregisterConnection(this);
+ }
}
- else
+ }
+
+ private void closeConnection()
+ {
+ switch (_connectionState)
{
- getNetwork().close();
+ case AWAITING_OPEN:
+ case OPEN:
+ Close closeToSend = new Close();
+ sendClose(closeToSend);
+ _connectionState = ConnectionState.CLOSE_SENT;
+ break;
+ case CLOSE_SENT:
+ default:
}
-
- _frameHandler = new FrameHandler(_endpoint);
- headerResponse = AMQP_LAYER_HEADER;
- return headerResponse;
}
- public ByteBuffer initiateSasl()
+ private void closeConnection(final Error error)
{
- final ByteBuffer headerResponse;
- _endpoint.setSaslFrameOutput(this);
+ Close close = new Close();
+ close.setError(error);
+ switch (_connectionState)
+ {
+ case UNOPENED:
+ sendOpen(0, 0);
+ sendClose(close);
+ _connectionState = ConnectionState.CLOSED;
+ break;
+ case AWAITING_OPEN:
+ case OPEN:
+ sendClose(close);
+ _connectionState = ConnectionState.CLOSE_SENT;
+ case CLOSE_SENT:
+ case CLOSED:
+ // already sent our close - too late to do anything more
+ break;
+ default:
+ // TODO Unknown state
+ }
+ }
- _endpoint.setOnSaslComplete(new Runnable()
+ int sendFrame(final short channel, final FrameBody body, final QpidByteBuffer payload)
+ {
+ if (!_closedForOutput)
{
- public void run()
+ ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body);
+ int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+ QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate();
+ int payloadSent = _maxFrameSize - (size + 9);
+ try
{
- if (_endpoint.isAuthenticated())
+ if (payloadSent < (payload == null ? 0 : payload.remaining()))
{
- getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate()));
- getSender().flush();
+
+ if (body instanceof Transfer)
+ {
+ ((Transfer) body).setMore(Boolean.TRUE);
+ }
+
+ writer = _describedTypeRegistry.getValueWriter(body);
+ size = writer.writeToBuffer(EMPTY_BYTE_BUFFER);
+ payloadSent = _maxFrameSize - (size + 9);
+
+ payloadDup.limit(payloadDup.position() + payloadSent);
}
else
{
- getNetwork().close();
+ payloadSent = payload == null ? 0 : payload.remaining();
}
+ send(AMQFrame.createAMQFrame(channel, body, payloadDup));
}
- });
- _frameHandler = new SASLFrameHandler(_endpoint);
- headerResponse = SASL_LAYER_HEADER;
- return headerResponse;
+ finally
+ {
+ if (payloadDup != null)
+ {
+ payloadDup.dispose();
+ }
+ }
+ return payloadSent;
+ }
+ else
+ {
+ return -1;
+ }
+ }
+
+ void sendFrame(final short channel, final FrameBody body)
+ {
+ sendFrame(channel, body, null);
+ }
+
+ public ByteBufferSender getSender()
+ {
+ return getNetwork().getSender();
}
@Override
@@ -311,15 +1197,14 @@ public class AMQPConnection_1_0 extends
return getNetwork().getRemoteAddress().toString();
}
- private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW");
- public synchronized void received(final QpidByteBuffer msg)
+ public void received(final QpidByteBuffer msg)
{
try
{
updateLastReadTime();
- if(RAW_LOGGER.isDebugEnabled())
+ if (RAW_LOGGER.isDebugEnabled())
{
QpidByteBuffer dup = msg.duplicate();
byte[] data = new byte[dup.remaining()];
@@ -328,137 +1213,122 @@ public class AMQPConnection_1_0 extends
Binary bin = new Binary(data);
RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
}
- ProtocolHandler frameHandler;
+
int remaining;
do
{
- frameHandler = _frameHandler;
remaining = msg.remaining();
- switch (_state)
+ switch (_frameReceivingState)
{
- case A:
- if (msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- break;
- }
- case M:
- if (msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.M;
- break;
- }
-
- case Q:
- if (msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.Q;
- break;
- }
- case P:
- if (msg.hasRemaining())
- {
- msg.get();
- }
- else
+ case AMQP_OR_SASL_HEADER:
+ case AMQP_HEADER:
+ if (remaining < 8)
{
- _state = State.P;
- break;
+ return;
}
- case PROTOCOL:
- if (msg.hasRemaining())
- {
- msg.get();
- }
- else
- {
- _state = State.PROTOCOL;
- break;
- }
- case MAJOR:
- if (msg.hasRemaining())
- {
- _major = msg.get();
- }
- else
- {
- _state = State.MAJOR;
- break;
- }
- case MINOR:
- if (msg.hasRemaining())
- {
- _minor = msg.get();
- }
- else
- {
- _state = State.MINOR;
- break;
- }
- case REVISION:
- if (msg.hasRemaining())
- {
- _revision = msg.get();
+ processProtocolHeader(msg);
+ break;
+ case OPEN_ONLY:
+ case ANY_FRAME:
+ case SASL_INIT_ONLY:
+ case SASL_RESPONSE_ONLY:
+ _frameHandler.parse(msg);
+ break;
+ case CLOSED:
+ // ignore;
+ break;
+ }
- _state = State.FRAME;
- }
- else
- {
- _state = State.REVISION;
- break;
- }
- case FRAME:
- if (msg.hasRemaining())
- {
- AccessController.doPrivileged(new PrivilegedAction<Void>()
- {
- @Override
- public Void run()
- {
- _frameHandler = _frameHandler.parse(msg);
- return null;
- }
- }, getAccessControllerContext());
- }
- }
}
- while(_frameHandler != frameHandler || msg.remaining() != remaining);
+ while (msg.remaining() != remaining);
}
- catch(ConnectionScopedRuntimeException e)
+ catch (ConnectionScopedRuntimeException e)
{
throw e;
}
- catch(RuntimeException e)
+ catch (RuntimeException e)
{
LOGGER.error("Unexpected exception while processing incoming data", e);
throw new ConnectionScopedRuntimeException("Unexpected exception while processing incoming data", e);
}
- finally
+
+ }
+
+ private void processProtocolHeader(final QpidByteBuffer msg)
+ {
+ if(msg.remaining() >= 8)
{
- msg.position(msg.limit());
+ byte[] header = new byte[8];
+ msg.get(header);
+
+ final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider();
+ final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure());
+
+ if(Arrays.equals(header, SASL_HEADER))
+ {
+ if(_saslComplete)
+ {
+ throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established");
+ }
+
+ getSender().send(QpidByteBuffer.wrap(SASL_HEADER));
+
+ SaslMechanisms mechanisms = new SaslMechanisms();
+ ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>();
+ for (String name : subjectCreator.getMechanisms())
+ {
+ mechanismsList.add(Symbol.valueOf(name));
+ }
+ mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()]));
+ send(new SASLFrame(mechanisms), null);
+
+ _frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
+ _frameHandler = new FrameHandler(this, true);
+ }
+ else if(Arrays.equals(header, AMQP_HEADER))
+ {
+ if(!_saslComplete)
+ {
+ final List<String> mechanisms = subjectCreator.getMechanisms();
+
+ if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null)
+ {
+ setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal()));
+ }
+ else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME))
+ {
+ setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL));
+ }
+ else
+ {
+ // TODO - log auth failure / close
+ getNetwork().close();
+ }
+
+ }
+ getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
+ _frameReceivingState = FrameReceivingState.OPEN_ONLY;
+ _frameHandler = new FrameHandler(this, false);
+
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException("Unknown protocol header");
+ }
+
}
- }
+
+ }
public void closed()
{
try
{
- _endpoint.inputClosed();
+ inputClosed();
}
catch(RuntimeException e)
{
@@ -468,7 +1338,7 @@ public class AMQPConnection_1_0 extends
{
try
{
- _connection.closed();
+ endpointClosed();
}
finally
{
@@ -487,57 +1357,49 @@ public class AMQPConnection_1_0 extends
send(amqFrame, null);
}
- private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM");
public void send(final AMQFrame amqFrame, ByteBuffer buf)
{
- synchronized (_sendLock)
- {
- updateLastWriteTime();
- if (FRAME_LOGGER.isDebugEnabled())
- {
- FRAME_LOGGER.debug("SEND["
- + getNetwork().getRemoteAddress()
- + "|"
- + amqFrame.getChannel()
- + "] : "
- + amqFrame.getFrameBody());
- }
+ updateLastWriteTime();
+ FRAME_LOGGER.debug("SEND[{}|{}] : {}",
+ getNetwork().getRemoteAddress(),
+ amqFrame.getChannel(),
+ amqFrame.getFrameBody());
- _frameWriter.setValue(amqFrame);
+ _frameWriter.setValue(amqFrame);
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
+ QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize());
- try
+ try
+ {
+ int size = _frameWriter.writeToBuffer(buffer);
+ if (size > getMaxFrameSize())
{
- int size = _frameWriter.writeToBuffer(buffer);
- if (size > _endpoint.getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame, size);
- }
-
- buffer.flip();
+ throw new OversizeFrameException(amqFrame, size);
+ }
- if (RAW_LOGGER.isDebugEnabled())
- {
- QpidByteBuffer dup = buffer.duplicate();
- byte[] data = new byte[dup.remaining()];
- dup.get(data);
- dup.dispose();
- Binary bin = new Binary(data);
- RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
- }
+ buffer.flip();
- getSender().send(buffer);
- getSender().flush();
- }
- finally
+ if (RAW_LOGGER.isDebugEnabled())
{
- buffer.dispose();
+ QpidByteBuffer dup = buffer.duplicate();
+ byte[] data = new byte[dup.remaining()];
+ dup.get(data);
+ dup.dispose();
+ Binary bin = new Binary(data);
+ RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString());
}
+
+ getSender().send(buffer);
+
+ }
+ finally
+ {
+ buffer.dispose();
}
+
}
public void send(short channel, FrameBody body)
@@ -565,7 +1427,7 @@ public class AMQPConnection_1_0 extends
if(_transportBlockedForWriting != blocked)
{
_transportBlockedForWriting = blocked;
- _connection.transportStateChanged();
+ transportStateChanged();
}
}
@@ -575,7 +1437,7 @@ public class AMQPConnection_1_0 extends
{
if (isIOThread())
{
- return _connection.processPendingIterator();
+ return new ProcessPendingIterator();
}
else
{
@@ -620,44 +1482,202 @@ public class AMQPConnection_1_0 extends
public void sendConnectionCloseAsync(final AMQConstant cause, final String message)
{
- _connection.sendConnectionCloseAsync(cause, message);
+ Action<ConnectionHandler> action = new Action<ConnectionHandler>()
+ {
+ @Override
+ public void performAction(final ConnectionHandler object)
+ {
+ closeConnection();
+
+ }
+ };
+ addAsyncTask(action);
}
public void closeSessionAsync(final AMQSessionModel<?> session,
final AMQConstant cause, final String message)
{
- _connection.closeSessionAsync((Session_1_0) session, cause, message);
+ closeSessionAsync((Session_1_0) session, cause, message);
}
public void block()
{
- _connection.block();
+ // TODO
}
public String getRemoteContainerName()
{
- return _connection.getRemoteContainerName();
+ return _remoteContainerId;
}
public List<Session_1_0> getSessionModels()
{
- return _connection.getSessionModels();
+ return new ArrayList<>(_sessions);
}
public void unblock()
{
- _connection.unblock();
+ // TODO
}
public long getSessionCountLimit()
{
- return _connection.getSessionCountLimit();
+ return 0; // TODO
}
@Override
- protected boolean isOrderlyClose()
+ public boolean isOrderlyClose()
{
- return _connection.getConnectionEndpoint().isOrderlyClose();
+ return _orderlyClose.get();
+ }
+
+ private void addAsyncTask(final Action<ConnectionHandler> action)
+ {
+ _asyncTaskList.add(action);
+ notifyWork();
+ }
+
+
+ private void sendOpen(final int channelMax, final int maxFrameSize)
+ {
+ Open open = new Open();
+
+ if (_receivingSessions == null)
+ {
+ _receivingSessions = new SessionEndpoint[channelMax + 1];
+ _sendingSessions = new SessionEndpoint[channelMax + 1];
+ }
+ if (channelMax < _channelMax)
+ {
+ _channelMax = channelMax;
+ }
+ open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
+ open.setContainerId(_container.getId());
+ open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ // TODO - should we try to set the hostname based on the connection information?
+ // open.setHostname();
+ open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
+ if (_properties != null)
+ {
+ open.setProperties(_properties);
+ }
+
+ sendFrame(CONNECTION_CONTROL_CHANNEL, open);
+ }
+
+ private void closeWithError(final AmqpError amqpError, final String errorDescription)
+ {
+ final Error err = new Error();
+ err.setCondition(amqpError);
+ err.setDescription(errorDescription);
+ closeConnection(err);
+ close();
+ _closedOnOpen = true;
+ }
+
+ private SessionEndpoint getSession(final short channel)
+ {
+ SessionEndpoint session = _receivingSessions[channel];
+ if (session == null)
+ {
+ Error error = new Error();
+ error.setCondition(ConnectionError.FRAMING_ERROR);
+ error.setDescription("Frame received on channel " + channel + " which is not known as a begun session.");
+ handleError(error);
+ }
+
+ return session;
+ }
+
+ private void sendClose(Close closeToSend)
+ {
+ sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend);
+ closeSender();
+ }
+
+ @Override
+ public String toString()
+ {
+ VirtualHost<?> virtualHost = getVirtualHost();
+ return "Connection_1_0["
+ + _connectionId
+ + " "
+ + getAddress()
+ + (virtualHost == null ? "" : (" vh : " + virtualHost.getName()))
+ + ']';
+ }
+
+
+ private void assertState(final FrameReceivingState state)
+ {
+ if(_frameReceivingState != state)
+ {
+ throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order. Required state: " + state + ", actual state: " + _frameReceivingState);
+ }
+ }
+
+
+ private class ProcessPendingIterator implements Iterator<Runnable>
+ {
+ private final List<? extends AMQSessionModel<?>> _sessionsWithPending;
+ private Iterator<? extends AMQSessionModel<?>> _sessionIterator;
+ private ProcessPendingIterator()
+ {
+ _sessionsWithPending = new ArrayList<>(getSessionModels());
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty());
+ }
+
+ @Override
+ public Runnable next()
+ {
+ if(!_sessionsWithPending.isEmpty())
+ {
+ if(!_sessionIterator.hasNext())
+ {
+ _sessionIterator = _sessionsWithPending.iterator();
+ }
+ final AMQSessionModel<?> session = _sessionIterator.next();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ if(!session.processPending())
+ {
+ _sessionIterator.remove();
+ }
+ }
+ };
+ }
+ else if(!_asyncTaskList.isEmpty())
+ {
+ final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll();
+ return new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ asyncAction.performAction(AMQPConnection_1_0.this);
+ }
+ };
+ }
+ else
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
}
@Override
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java?rev=1739270&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java Fri Apr 15 10:10:16 2016
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public interface ConnectionHandler extends SASLEndpoint
+{
+ void receiveOpen(short channel, Open close);
+
+ void receiveClose(short channel, Close close);
+
+ void receiveBegin(short channel, Begin begin);
+
+ void receiveEnd(short channel, End end);
+
+ void receiveAttach(short channel, Attach attach);
+
+ void receiveDetach(short channel, Detach detach);
+
+ void receiveTransfer(short channel, Transfer transfer);
+
+ void receiveDisposition(short channel, Disposition disposition);
+
+ void receiveFlow(short channel, Flow flow);
+
+ int getMaxFrameSize();
+
+ void handleError(Error parsingError);
+
+ boolean closedForInput();
+
+ void receive(short channel, Object val);
+}
Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java Fri Apr 15 10:10:16 2016
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
public enum ConnectionState
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Apr 15 10:10:16 2016
@@ -26,23 +26,22 @@ import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.amqp_1_0.codec.ValueHandler;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
-import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
-import org.apache.qpid.amqp_1_0.type.messaging.Header;
-import org.apache.qpid.amqp_1_0.type.messaging.Modified;
-import org.apache.qpid.amqp_1_0.type.messaging.Released;
-import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
+import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.transport.ProtocolEngine;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
@@ -130,21 +129,21 @@ class ConsumerTarget_1_0 extends Abstrac
QpidByteBuffer payload = null;
//TODO
Collection<QpidByteBuffer> fragments = message.getFragments();
- if(fragments.size() == 1)
+ if (fragments.size() == 1)
{
payload = fragments.iterator().next();
}
else
{
int size = 0;
- for(QpidByteBuffer fragment : fragments)
+ for (QpidByteBuffer fragment : fragments)
{
size += fragment.remaining();
}
payload = QpidByteBuffer.allocateDirect(size);
- for(QpidByteBuffer fragment : fragments)
+ for (QpidByteBuffer fragment : fragments)
{
payload.put(fragment);
fragment.dispose();
@@ -153,7 +152,7 @@ class ConsumerTarget_1_0 extends Abstrac
payload.flip();
}
- if(entry.getDeliveryCount() != 0)
+ if (entry.getDeliveryCount() != 0)
{
ValueHandler valueHandler = new ValueHandler(_typeRegistry);
@@ -161,7 +160,7 @@ class ConsumerTarget_1_0 extends Abstrac
try
{
Object value = valueHandler.parse(payload);
- if(value instanceof Header)
+ if (value instanceof Header)
{
oldHeader = (Header) value;
}
@@ -177,7 +176,7 @@ class ConsumerTarget_1_0 extends Abstrac
}
Header header = new Header();
- if(oldHeader != null)
+ if (oldHeader != null)
{
header.setDurable(oldHeader.getDurable());
header.setPriority(oldHeader.getPriority());
@@ -190,7 +189,7 @@ class ConsumerTarget_1_0 extends Abstrac
QpidByteBuffer oldPayload = payload;
payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength());
- payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength());
+ payload.put(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength());
payload.put(oldPayload);
oldPayload.dispose();
payload.flip();
@@ -203,58 +202,57 @@ class ConsumerTarget_1_0 extends Abstrac
transfer.setDeliveryTag(tag);
- synchronized(_link.getLock())
+ if (_link.isAttached())
{
- if(_link.isAttached())
+ if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
{
- if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
- {
- transfer.setSettled(true);
- }
- else
- {
- UnsettledAction action = _acquires
- ? new DispositionAction(tag, entry)
- : new DoNothingAction(tag, entry);
+ transfer.setSettled(true);
+ }
+ else
+ {
+ UnsettledAction action = _acquires
+ ? new DispositionAction(tag, entry)
+ : new DoNothingAction(tag, entry);
- _link.addUnsettled(tag, action, entry);
- }
+ _link.addUnsettled(tag, action, entry);
+ }
- if(_transactionId != null)
- {
- TransactionalState state = new TransactionalState();
- state.setTxnId(_transactionId);
- transfer.setState(state);
- }
- // TODO - need to deal with failure here
- if(_acquires && _transactionId != null)
+ if (_transactionId != null)
+ {
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(_transactionId);
+ transfer.setState(state);
+ }
+ // TODO - need to deal with failure here
+ if (_acquires && _transactionId != null)
+ {
+ ServerTransaction txn = _link.getTransaction(_transactionId);
+ if (txn != null)
{
- ServerTransaction txn = _link.getTransaction(_transactionId);
- if(txn != null)
+ txn.addPostTransactionAction(new ServerTransaction.Action()
{
- txn.addPostTransactionAction(new ServerTransaction.Action(){
- public void postCommit()
- {
- }
-
- public void onRollback()
- {
- entry.release(getConsumer());
- _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true);
- }
- });
- }
+ public void postCommit()
+ {
+ }
+ public void onRollback()
+ {
+ entry.release(getConsumer());
+ _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
+ }
+ });
}
- getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
- getEndpoint().transfer(transfer, false);
- }
- else
- {
- entry.release(getConsumer());
+
}
+ getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
+ getEndpoint().transfer(transfer, false);
}
+ else
+ {
+ entry.release(getConsumer());
+ }
+
}
finally
{
@@ -281,63 +279,48 @@ class ConsumerTarget_1_0 extends Abstrac
public boolean allocateCredit(final ServerMessage msg)
{
- synchronized (_link.getLock())
+ ProtocolEngine protocolEngine = getSession().getConnection();
+ final boolean hasCredit =
+ _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
+ if (!hasCredit && getState() == State.ACTIVE)
{
+ suspend();
+ }
- ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection();
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting();
- if (!hasCredit && getState() == State.ACTIVE)
- {
- suspend();
- }
-
- if (hasCredit)
- {
- SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
- linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
- }
-
- return hasCredit;
+ if (hasCredit)
+ {
+ SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
+ linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
}
+
+ return hasCredit;
}
public void suspend()
{
- synchronized(_link.getLock())
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
+ updateState(State.ACTIVE, State.SUSPENDED);
}
public void restoreCredit(final ServerMessage message)
{
- synchronized (_link.getLock())
- {
- final SendingLinkEndpoint endpoint = _link.getEndpoint();
- endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
- }
+ final SendingLinkEndpoint endpoint = _link.getEndpoint();
+ endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
}
public void queueEmpty()
{
- synchronized(_link.getLock())
- {
- _queueEmpty = true;
- }
+ _queueEmpty = true;
}
public void flowStateChanged()
{
- synchronized(_link.getLock())
+ ProtocolEngine protocolEngine = getSession().getConnection();
+ if (isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
{
- ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection();
- if(isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting())
- {
- updateState(State.SUSPENDED, State.ACTIVE);
- _transactionId = _link.getTransactionId();
- }
+ updateState(State.SUSPENDED, State.ACTIVE);
+ _transactionId = _link.getTransactionId();
}
}
@@ -552,16 +535,13 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
protected void processStateChanged()
{
- synchronized (_link.getLock())
+ if(_queueEmpty)
{
- if(_queueEmpty)
- {
- _queueEmpty = false;
+ _queueEmpty = false;
- if(_link.drained())
- {
- updateState(State.ACTIVE, State.SUSPENDED);
- }
+ if(_link.drained())
+ {
+ updateState(State.ACTIVE, State.SUSPENDED);
}
}
}
@@ -569,10 +549,7 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
protected boolean hasStateChanged()
{
- synchronized (_link.getLock())
- {
- return _queueEmpty;
- }
+ return _queueEmpty;
}
@Override
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java Fri Apr 15 10:10:16 2016
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Fri Apr 15 10:10:16 2016
@@ -17,14 +17,11 @@
* under the License.
*/
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Delivery
{
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java Fri Apr 15 10:10:16 2016
@@ -17,10 +17,10 @@
* under the License.
*/
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
public interface DeliveryStateHandler
{
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java Fri Apr 15 10:10:16 2016
@@ -18,10 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.amqp_1_0.transport;
+package org.apache.qpid.server.protocol.v1_0;
public interface ErrorHandler
{
- void handleError(org.apache.qpid.amqp_1_0.type.transport.Error error);
+ void handleError(org.apache.qpid.server.protocol.v1_0.type.transport.Error error);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org