You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/30 17:30:44 UTC
svn commit: r1628473 [2/3] - in
/qpid/branches/QPID-6125-ProtocolRefactoring/java:
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-8-protocol/...
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java?rev=1628473&r1=1628472&r2=1628473&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ServerMethodDispatcherImpl.java Tue Sep 30 15:30:43 2014
@@ -20,92 +20,71 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.UUID;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
+import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.AMQChannel;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.state.AMQState;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class ServerMethodDispatcherImpl implements MethodDispatcher
{
- private final AMQProtocolSession<?> _connection;
-
- private static interface DispatcherFactory
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection);
- }
-
- private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
- new HashMap<ProtocolVersion, DispatcherFactory>();
-
-
- static
- {
- _dispatcherFactories.put(ProtocolVersion.v8_0,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_8_0(connection);
- }
- });
-
- _dispatcherFactories.put(ProtocolVersion.v0_9,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_9(connection);
- }
- });
- _dispatcherFactories.put(ProtocolVersion.v0_91,
- new DispatcherFactory()
- {
- public MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
- {
- return new ServerMethodDispatcherImpl_0_91(connection);
- }
- });
-
- }
-
-
- private static final AccessRequestHandler _accessRequestHandler = AccessRequestHandler.getInstance();
- private static final ChannelCloseHandler _channelCloseHandler = ChannelCloseHandler.getInstance();
- private static final ChannelOpenHandler _channelOpenHandler = ChannelOpenHandler.getInstance();
- private static final ChannelCloseOkHandler _channelCloseOkHandler = ChannelCloseOkHandler.getInstance();
- private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
- private static final ConnectionCloseOkMethodHandler _connectionCloseOkMethodHandler = ConnectionCloseOkMethodHandler.getInstance();
- private static final ConnectionOpenMethodHandler _connectionOpenMethodHandler = ConnectionOpenMethodHandler.getInstance();
- private static final ConnectionTuneOkMethodHandler _connectionTuneOkMethodHandler = ConnectionTuneOkMethodHandler.getInstance();
- private static final ConnectionSecureOkMethodHandler _connectionSecureOkMethodHandler = ConnectionSecureOkMethodHandler.getInstance();
- private static final ConnectionStartOkMethodHandler _connectionStartOkMethodHandler = ConnectionStartOkMethodHandler.getInstance();
- private static final ExchangeDeclareHandler _exchangeDeclareHandler = ExchangeDeclareHandler.getInstance();
- private static final ExchangeDeleteHandler _exchangeDeleteHandler = ExchangeDeleteHandler.getInstance();
- private static final ExchangeBoundHandler _exchangeBoundHandler = ExchangeBoundHandler.getInstance();
- private static final BasicAckMethodHandler _basicAckMethodHandler = BasicAckMethodHandler.getInstance();
- private static final BasicRecoverMethodHandler _basicRecoverMethodHandler = BasicRecoverMethodHandler.getInstance();
- private static final BasicConsumeMethodHandler _basicConsumeMethodHandler = BasicConsumeMethodHandler.getInstance();
- private static final BasicGetMethodHandler _basicGetMethodHandler = BasicGetMethodHandler.getInstance();
- private static final BasicCancelMethodHandler _basicCancelMethodHandler = BasicCancelMethodHandler.getInstance();
- private static final BasicPublishMethodHandler _basicPublishMethodHandler = BasicPublishMethodHandler.getInstance();
- private static final BasicQosHandler _basicQosHandler = BasicQosHandler.getInstance();
- private static final QueueBindHandler _queueBindHandler = QueueBindHandler.getInstance();
- private static final QueueDeclareHandler _queueDeclareHandler = QueueDeclareHandler.getInstance();
- private static final QueueDeleteHandler _queueDeleteHandler = QueueDeleteHandler.getInstance();
- private static final QueuePurgeHandler _queuePurgeHandler = QueuePurgeHandler.getInstance();
- private static final ChannelFlowHandler _channelFlowHandler = ChannelFlowHandler.getInstance();
- private static final TxSelectHandler _txSelectHandler = TxSelectHandler.getInstance();
- private static final TxCommitHandler _txCommitHandler = TxCommitHandler.getInstance();
- private static final TxRollbackHandler _txRollbackHandler = TxRollbackHandler.getInstance();
- private static final BasicRejectMethodHandler _basicRejectMethodHandler = BasicRejectMethodHandler.getInstance();
+ private static final Logger _logger = Logger.getLogger(ServerMethodDispatcherImpl.class);
+ private final AMQProtocolSession<?> _connection;
public static MethodDispatcher createMethodDispatcher(AMQProtocolSession<?> connection)
{
- return _dispatcherFactories.get(connection.getProtocolVersion()).createMethodDispatcher(connection);
+ return new ServerMethodDispatcherImpl(connection);
}
@@ -122,61 +101,618 @@ public class ServerMethodDispatcherImpl
public boolean dispatchAccessRequest(AccessRequestBody body, int channelId) throws AMQException
{
- _accessRequestHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if(ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()) )
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ channel.sync();
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchBasicAck(BasicAckBody body, int channelId) throws AMQException
{
- _basicAckMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Ack(Tag:" + body.getDeliveryTag() + ":Mult:" + body.getMultiple() + ") received on channel " + channelId);
+ }
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ // this method throws an AMQException if the delivery tag is not known
+ channel.acknowledgeMessage(body.getDeliveryTag(), body.getMultiple());
return true;
}
public boolean dispatchBasicCancel(BasicCancelBody body, int channelId) throws AMQException
{
- _basicCancelMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicCancel: for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait());
+ }
+
+ channel.unsubscribeConsumer(body.getConsumerTag());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(body.getConsumerTag());
+ channel.sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchBasicConsume(BasicConsumeBody body, int channelId) throws AMQException
{
- _basicConsumeMethodHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ VirtualHostImpl<?,?,?> vHost = _connection.getVirtualHost();
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ String queueName = body.getQueue() == null ? null : body.getQueue().asString();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("BasicConsume: from '" + queueName +
+ "' for:" + body.getConsumerTag() +
+ " nowait:" + body.getNowait() +
+ " args:" + body.getArguments());
+ }
+
+ MessageSource queue = queueName == null ? channel.getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if(queue != null)
+ {
+ sources.add(queue);
+ }
+ else if(vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && body.getArguments() != null
+ && body.getArguments().get("x-multiqueue") instanceof Collection)
+ {
+ for(Object object : (Collection<Object>) body.getArguments().get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if(sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if(source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = body.getArguments().get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ String msg = "No such queue, '" + queueName + "'";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ String msg = "No queue name provided, no default queue defined.";
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+ final AMQShortString consumerTagName;
+
+ if (body.getConsumerTag() != null)
+ {
+ consumerTagName = body.getConsumerTag().intern(false);
+ }
+ else
+ {
+ consumerTagName = null;
+ }
+
+ try
+ {
+ if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
+ {
+
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ sources,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive(),
+ body.getNoLocal());
+ if (!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
+ }
+ else
+ {
+ AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg, // replytext
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(0));
+ }
+
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ _logger.debug("Closing connection due to invalid selector");
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
+ AMQShortString.validValueOf(ise.getMessage()),
+ body.getClazz(),
+ body.getMethod());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an existing exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " exclusively as it already has a consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " permission denied", _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue.getName()
+ + " as it already has an incompatible exclusivity policy",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ }
return true;
}
public boolean dispatchBasicGet(BasicGetBody body, int channelId) throws AMQException
{
- _basicGetMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ else
+ {
+ channel.sync();
+ AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + body.getQueue() + "'");
+ if(body.getQueue()!=null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "No such queue, '" + body.getQueue() + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.",
+ _connection.getMethodRegistry());
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, _connection, channel, !body.getNoAck()))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(), _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker",
+ _connection.getMethodRegistry());
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivit policy",
+ _connection.getMethodRegistry());
+ }
+ }
+ }
return true;
}
+ public static boolean performGet(final AMQQueue queue,
+ final AMQProtocolSession session,
+ final AMQChannel channel,
+ final boolean acks)
+ throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag)
+ {
+ channel.addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
+ if(acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return(getDeliveryMethod.hasDeliveredMessage());
+
+
+ }
+
+
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size =_session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
+
public boolean dispatchBasicPublish(BasicPublishBody body, int channelId) throws AMQException
{
- _basicPublishMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Publish received on channel " + channelId);
+ }
+
+ AMQShortString exchangeName = body.getExchange();
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+
+ MessageDestination destination;
+
+ if (exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // The partially populated BasicDeliver frame plus the received route body
+ // is stored in the channel. Once the final body frame has been received
+ // it is routed to the exchange.
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ MessagePublishInfo info = new MessagePublishInfo(body.getExchange(),
+ body.getImmediate(),
+ body.getMandatory(),
+ body.getRoutingKey());
+ info.setExchange(exchangeName);
+ try
+ {
+ channel.setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
return true;
}
public boolean dispatchBasicQos(BasicQosBody body, int channelId) throws AMQException
{
- _basicQosHandler.methodReceived(getConnection(), body, channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
+
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
return true;
}
public boolean dispatchBasicRecover(BasicRecoverBody body, int channelId) throws AMQException
{
- _basicRecoverMethodHandler.methodReceived(getConnection(), body, channelId);
+ _logger.debug("Recover received on protocol session " + _connection
+ + " and channel " + channelId);
+ AMQChannel channel = _connection.getChannel(channelId);
+
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ channel.resend();
+
+ // Qpid 0-8 hacks a synchronous -ok onto recover.
+ // In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
+ if(_connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ channel.sync();
+ _connection.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
+
return true;
}
public boolean dispatchBasicReject(BasicRejectBody body, int channelId) throws AMQException
{
- _basicRejectMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting:" + body.getDeliveryTag() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ long deliveryTag = body.getDeliveryTag();
+
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
+ ": Requeue:" + body.getRequeue() +
+ " on channel:" + channel.debugIdentity());
+ }
+
+ if (body.getRequeue())
+ {
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
+ }
+ }
+ }
return true;
}
public boolean dispatchChannelOpen(ChannelOpenBody body, int channelId) throws AMQException
{
- _channelOpenHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ // Protect the broker against out of order frame request.
+ if (virtualHost == null)
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "Virtualhost has not yet been set. ConnectionOpen has not been called.", null);
+ }
+ _logger.info("Connecting to: " + virtualHost.getName());
+
+ final AMQChannel channel = new AMQChannel(_connection, channelId, virtualHost.getMessageStore());
+
+ _connection.addChannel(channel);
+
+ ChannelOpenOkBody response;
+
+
+ response = _connection.getMethodRegistry().createChannelOpenOkBody();
+
+
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
@@ -186,20 +722,6 @@ public class ServerMethodDispatcherImpl
throw new UnexpectedMethodException(body);
}
- @Override
- public boolean dispatchQueueUnbindOk(final QueueUnbindOkBody body, final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
-
- @Override
- public boolean dispatchBasicRecoverSyncOk(final BasicRecoverSyncOkBody body,
- final int channelId)
- throws AMQException
- {
- throw new UnexpectedMethodException(body);
- }
public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
{
@@ -238,21 +760,64 @@ public class ServerMethodDispatcherImpl
public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
{
- _channelCloseHandler.methodReceived(getConnection(), body, channelId);
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received channel close for id " + channelId
+ + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
+ }
+
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR,
+ "Trying to close unknown channel",
+ _connection.getMethodRegistry());
+ }
+ channel.sync();
+ _connection.closeChannel(channelId);
+ // Client requested closure so we don't wait for ok we send it
+ _connection.closeChannelOk(channelId);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
public boolean dispatchChannelCloseOk(ChannelCloseOkBody body, int channelId) throws AMQException
{
- _channelCloseOkHandler.methodReceived(getConnection(), body, channelId);
+
+ _logger.info("Received channel-close-ok for channel-id " + channelId);
+
+ // Let the Protocol Session know the channel is now closed.
+ _connection.closeChannelOk(channelId);
return true;
}
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- _channelFlowHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+
+ AMQChannel channel = connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
+ }
+ channel.sync();
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
+
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+ connection.writeFrame(responseBody.generateFrame(channelId));
return true;
}
@@ -269,23 +834,103 @@ public class ServerMethodDispatcherImpl
public boolean dispatchConnectionOpen(ConnectionOpenBody body, int channelId) throws AMQException
{
- _connectionOpenMethodHandler.methodReceived(getConnection(), body, channelId);
+
+ //ignore leading '/'
+ String virtualHostName;
+ if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
+ {
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
+ }
+ else
+ {
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
+ }
+
+ VirtualHostImpl virtualHost = ((AmqpPort) _connection.getPort()).getVirtualHost(virtualHostName);
+
+ if (virtualHost == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ // Check virtualhost access
+ if (virtualHost.getState() != State.ACTIVE)
+ {
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED,
+ "Virtual host '" + virtualHost.getName() + "' is not active",
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setVirtualHost(virtualHost);
+ try
+ {
+ virtualHost.getSecurityManager().authoriseCreateConnection(_connection);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ // See Spec (0.8.2). Section 3.1.2 Virtual Hosts
+ if (_connection.getContextKey() == null)
+ {
+ _connection.setContextKey(new AMQShortString(Long.toString(System.currentTimeMillis())));
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
+ _connection.changeState(AMQState.CONNECTION_OPEN);
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
{
- _connectionCloseMethodHandler.methodReceived(getConnection(), body, channelId);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + _connection);
+ }
+ try
+ {
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _connection.closeProtocolSession();
+
return true;
}
public boolean dispatchConnectionCloseOk(ConnectionCloseOkBody body, int channelId) throws AMQException
{
- _connectionCloseOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ _logger.info("Received Connection-close-ok");
+
+ try
+ {
+ _connection.changeState(AMQState.CONNECTION_CLOSED);
+ _connection.closeSession();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error closing protocol session: " + e, e);
+ }
return true;
}
@@ -368,92 +1013,1246 @@ public class ServerMethodDispatcherImpl
public boolean dispatchConnectionSecureOk(ConnectionSecureOkBody body, int channelId) throws AMQException
{
- _connectionSecureOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+
+ SaslServer ss = _connection.getSaslServer();
+ if (ss == null)
+ {
+ throw new AMQException("No SASL context set up in session");
+ }
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ // This should be abstracted
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody connectionCloseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ _connection.setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer(_connection);
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
return true;
}
+ private void disposeSaslServer(AMQProtocolSession ps)
+ {
+ SaslServer ss = ps.getSaslServer();
+ if (ss != null)
+ {
+ ps.setSaslServer(null);
+ try
+ {
+ ss.dispose();
+ }
+ catch (SaslException e)
+ {
+ _logger.error("Error disposing of Sasl server: " + e);
+ }
+ }
+ }
+
public boolean dispatchConnectionStartOk(ConnectionStartOkBody body, int channelId) throws AMQException
{
- _connectionStartOkMethodHandler.methodReceived(
- getConnection(),
- body, channelId);
+ Broker<?> broker = _connection.getBroker();
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
+
+ SubjectCreator subjectCreator = _connection.getSubjectCreator();
+ SaslServer ss = null;
+ try
+ {
+ ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+ _connection.getLocalFQDN(),
+ _connection.getPeerPrincipal());
+
+ if (ss == null)
+ {
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR,
+ "Unable to create SASL Server:" + body.getMechanism(),
+ _connection.getMethodRegistry());
+ }
+
+ _connection.setSaslServer(ss);
+
+ final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
+ //save clientProperties
+ _connection.setClientProperties(body.getClientProperties());
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ switch (authResult.getStatus())
+ {
+ case ERROR:
+ Exception cause = authResult.getCause();
+
+ _logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
+
+ _connection.changeState(AMQState.CONNECTION_CLOSING);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ _connection.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(_connection);
+ break;
+
+ case SUCCESS:
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Connected as: " + authResult.getSubject());
+ }
+ _connection.setAuthorizedSubject(authResult.getSubject());
+
+ _connection.changeState(AMQState.CONNECTION_NOT_TUNED);
+ int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+
+ if(frameMax <= 0)
+ {
+ frameMax = Integer.MAX_VALUE;
+ }
+
+ ConnectionTuneBody
+ tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
+ frameMax,
+ broker.getConnection_heartBeatDelay());
+ _connection.writeFrame(tuneBody.generateFrame(0));
+ break;
+ case CONTINUE:
+ _connection.changeState(AMQState.CONNECTION_NOT_AUTH);
+
+ ConnectionSecureBody
+ secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
+ _connection.writeFrame(secureBody.generateFrame(0));
+ }
+ }
+ catch (SaslException e)
+ {
+ disposeSaslServer(_connection);
+ throw new AMQException("SASL error: " + e, e);
+ }
return true;
}
public boolean dispatchConnectionTuneOk(ConnectionTuneOkBody body, int channelId) throws AMQException
{
- _connectionTuneOkMethodHandler.methodReceived(getConnection(), body, channelId);
+ final AMQProtocolSession<?> connection = getConnection();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(body);
+ }
+ connection.changeState(AMQState.CONNECTION_NOT_OPENED);
+
+ connection.initHeartbeats(body.getHeartbeat());
+
+ int brokerFrameMax = connection.getBroker().getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
+ if(brokerFrameMax <= 0)
+ {
+ brokerFrameMax = Integer.MAX_VALUE;
+ }
+
+ if(body.getFrameMax() > (long) brokerFrameMax)
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " greater than the broker will allow: "
+ + brokerFrameMax,
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
+ {
+ throw new AMQConnectionException(AMQConstant.SYNTAX_ERROR,
+ "Attempt to set max frame size to " + body.getFrameMax()
+ + " which is smaller than the specification definined minimum: "
+ + AMQConstant.FRAME_MIN_SIZE.getCode(),
+ body.getClazz(), body.getMethod(),
+ connection.getMethodRegistry(),null);
+ }
+ int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
+ connection.setMaxFrameSize(frameMax);
+
+ long maxChannelNumber = body.getChannelMax();
+ //0 means no implied limit, except that forced by protocol limitations (0xFFFF)
+ connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
return true;
}
+ public static final int OK = 0;
+ public static final int EXCHANGE_NOT_FOUND = 1;
+ public static final int QUEUE_NOT_FOUND = 2;
+ public static final int NO_BINDINGS = 3;
+ public static final int QUEUE_NOT_BOUND = 4;
+ public static final int NO_QUEUE_BOUND_WITH_RK = 5;
+ public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+
public boolean dispatchExchangeBound(ExchangeBoundBody body, int channelId) throws AMQException
{
- _exchangeBoundHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
+ ExchangeBoundOkBody response;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(routingKey == null)
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueues().isEmpty() ? NO_BINDINGS : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ }
+ }
+ else
+ {
+ if(queueName == null)
+ {
+ response = methodRegistry.createExchangeBoundOkBody(virtualHost.getQueue(routingKey.toString()) == null ? NO_QUEUE_BOUND_WITH_RK : OK, null);
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ response = methodRegistry.createExchangeBoundOkBody(queueName.equals(routingKey) ? OK : SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, null);
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ AMQShortString.validValueOf("Exchange '" + exchangeName + "' not found"));
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
+ null); // replyText
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not bound to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
+ AMQShortString.validValueOf("Queue '" + queueName + "' not found")); // replyText
+ }
+ else
+ {
+ String bindingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ String message = "Queue '" + queueName + "' not bound with routing key '" +
+ body.getRoutingKey() + "' to exchange '" + exchangeName + "'";
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf(message)); // replyText
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(body.getRoutingKey() == null ? "" : body.getRoutingKey().asString()))
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
+ null); // replyText
+ }
+ else
+ {
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ AMQShortString.validValueOf("No queue bound with routing key '" + body.getRoutingKey() +
+ "' to exchange '" + exchangeName + "'")); // replyText
+ }
+ }
+ }
+ _connection.writeFrame(response.generateFrame(channelId));
return true;
}
public boolean dispatchExchangeDeclare(ExchangeDeclareBody body, int channelId) throws AMQException
{
- _exchangeDeclareHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQShortString exchangeName = body.getExchange();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + exchangeName);
+ }
+
+ ExchangeImpl exchange;
+
+ if(isDefaultExchange(exchangeName))
+ {
+ if(!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(body.getType()))
+ {
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+ }
+ else
+ {
+ if (body.getPassive())
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+ _connection.getMethodRegistry());
+ }
+ else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
+ {
+
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
+ exchangeName + " of type " + exchange.getType()
+ + " to " + body.getType() +".",
+ body.getClazz(), body.getMethod(),
+ _connection.getMethodRegistry(),null);
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String type = body.getType() == null ? null : body.getType().intern().toString();
+
+ Map<String,Object> attributes = new HashMap<String, Object>();
+ if(body.getArguments() != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(body.getArguments()));
+ }
+ attributes.put(org.apache.qpid.server.model.Exchange.ID, null);
+ attributes.put(org.apache.qpid.server.model.Exchange.NAME,name);
+ attributes.put(org.apache.qpid.server.model.Exchange.TYPE,type);
+ attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, body.getDurable());
+ attributes.put(org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ body.getAutoDelete() ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if(!attributes.containsKey(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ }
+ catch(ReservedExchangeNameException e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.",
+ _connection.getMethodRegistry());
+
+ }
+ catch(ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if(!new AMQShortString(exchange.getType()).equals(body.getType()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + body.getType() + ".",
+ _connection.getMethodRegistry());
+ }
+ }
+ catch(NoFactoryForTypeException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId()),
+ _connection.getMethodRegistry());
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID,
+ "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+ }
+ }
+
+ if(!body.getNowait())
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ channel.sync();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
return true;
}
public boolean dispatchExchangeDelete(ExchangeDeleteBody body, int channelId) throws AMQException
{
- _exchangeDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ final AMQChannel channel = _connection.getChannel(channelId);
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ try
+ {
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if(exchange == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+ _connection.getMethodRegistry());
+ }
+
+ virtualHost.removeExchange(exchange, !body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+ }
+
+ catch (ExchangeIsAlternateException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+ _connection.getMethodRegistry());
+
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED,
+ "Exchange '" + body.getExchange() + "' cannot be deleted",
+ _connection.getMethodRegistry());
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
return true;
}
+ private boolean isDefaultExchange(final AMQShortString exchangeName)
+ {
+ return exchangeName == null || exchangeName.equals(AMQShortString.EMPTY_STRING);
+ }
+
public boolean dispatchQueueBind(QueueBindBody body, int channelId) throws AMQException
{
- _queueBindHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ final AMQQueue queue;
+ final AMQShortString routingKey;
+
+ final AMQShortString queueName = body.getQueue();
+
+ if (queueName == null)
+ {
+
+ queue = channel.getDefaultQueue();
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND,
+ "No default queue defined on channel and queue was null",
+ _connection.getMethodRegistry());
+ }
+
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = body.getRoutingKey().intern();
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = body.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : body.getRoutingKey().intern();
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+ if(isDefaultExchange(body.getExchange()))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the default exchange",
+ _connection.getMethodRegistry());
+ }
+
+ final String exchangeName = body.getExchange().toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + exchangeName + " does not exist.",
+ _connection.getMethodRegistry());
+ }
+
+
+ try
+ {
+
+ Map<String,Object> arguments = FieldTable.convertToMap(body.getArguments());
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if(!exch.addBinding(bindingKey, queue, arguments) && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+ }
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ }
return true;
}
public boolean dispatchQueueDeclare(QueueDeclareBody body, int channelId) throws AMQException
{
- _queueDeclareHandler.methodReceived(getConnection(), body, channelId);
+ final AMQSessionModel session = _connection.getChannel(channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((body.getQueue() == null) || (body.getQueue().length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = body.getQueue().intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+
+ if(body.getPassive())
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+ }
+ else
+ {
+
+ try
+ {
+
+ queue = createQueue(channel, queueName, body, virtualHost, _connection);
+
+ }
+ catch(QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isExclusive() != body.getExclusive())
+ {
+
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + body.getExclusive()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if((body.getAutoDelete() && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!body.getAutoDelete() && queue.getLifetimePolicy() != ((body.getExclusive() && !body.getDurable()) ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE : LifetimePolicy.PERMANENT)))
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + body.getAutoDelete()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+ else if(queue.isDurable() != body.getDurable())
+ {
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + body.getDurable()
+ + ")",
+ _connection.getMethodRegistry());
+ }
+
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
+ _connection.getMethodRegistry());
+ }
+
+ //set this as the default queue on the channel:
+ channel.setDefaultQueue(queue);
+ }
+
+ if (!body.getNowait())
+ {
+ channel.sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(channelId));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
return true;
}
+ protected AMQQueue createQueue(final AMQChannel channel, final AMQShortString queueName,
+ QueueDeclareBody body,
+ final VirtualHostImpl virtualHost,
+ final AMQProtocolSession session)
+ throws AMQException, QueueExistsException
+ {
+
+ final boolean durable = body.getDurable();
+ final boolean autoDelete = body.getAutoDelete();
+ final boolean exclusive = body.getExclusive();
+
+
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if(exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ final AMQQueue queue = virtualHost.createQueue(attributes);
+
+ return queue;
+ }
+
public boolean dispatchQueueDelete(QueueDeleteBody body, int channelId) throws AMQException
{
- _queueDeleteHandler.methodReceived(getConnection(), body, channelId);
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ AMQChannel channel = _connection.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getChannelNotFoundException(channelId, _connection.getMethodRegistry());
+ }
+ channel.sync();
+ AMQQueue queue;
+ if (body.getQueue() == null)
+ {
+
+ //get the default queue on the channel:
+ queue = channel.getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(body.getQueue().toString());
+ }
+
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.",
+ _connection.getMethodRegistry());
+
+ }
+ else
+ {
+ if (body.getIfEmpty() && !queue.isEmpty())
+ {
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.",
+ _connection.getMethodRegistry());
+ }
+ else if (body.getIfUnused() && !queue.isUnused())
+ {
+ // TODO - Error code
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.",
+ _connection.getMethodRegistry());
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(channel))
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.",
+ _connection.getMethodRegistry());
+ }
+
+ int purged = 0;
+ try
+ {
+ purged = virtualHost.removeQueue(queue);
+ }
+ catch (AccessControlException e)
+ {
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
+ e.getMessage(),
[... 318 lines stripped ...]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org