You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/17 16:23:22 UTC
svn commit: r1632583 [2/5] - in /qpid/trunk/qpid/java: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/...
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1632583&r1=1632582&r2=1632583&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Oct 17 14:23:19 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import static org.apache.qpid.transport.util.Functions.hex;
+
import java.nio.ByteBuffer;
import java.security.AccessControlException;
import java.security.PrivilegedAction;
@@ -47,14 +49,8 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
@@ -62,6 +58,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.consumer.ConsumerTarget;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -69,6 +66,7 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
+import org.apache.qpid.server.flow.MessageOnlyCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -85,13 +83,18 @@ import org.apache.qpid.server.model.Conf
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.NoFactoryForTypeException;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
-import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
@@ -103,12 +106,18 @@ import org.apache.qpid.server.txn.LocalT
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel<T extends AMQProtocolSession<T>>
- implements AMQSessionModel<AMQChannel<T>,T>,
- AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel
+ implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
+ AsyncAutoCommitTransaction.FutureRecorder,
+ ServerChannelMethodProcessor
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -159,7 +168,7 @@ public class AMQChannel<T extends AMQPro
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final T _session;
+ private final AMQProtocolEngine _connection;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -181,8 +190,8 @@ public class AMQChannel<T extends AMQPro
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
- private final List<Action<? super AMQChannel<T>>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+ private final List<Action<? super AMQChannel>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel>>();
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -194,15 +203,14 @@ public class AMQChannel<T extends AMQPro
private Session<?> _modelObject;
- public AMQChannel(T session, int channelId, final MessageStore messageStore)
- throws AMQException
+ public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
{
- _session = session;
+ _connection = connection;
_channelId = channelId;
- _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
- session.getAuthorizedSubject().getPublicCredentials(),
- session.getAuthorizedSubject().getPrivateCredentials());
+ _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
+ connection.getAuthorizedSubject().getPublicCredentials(),
+ connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
@@ -211,7 +219,7 @@ public class AMQChannel<T extends AMQPro
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+ _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@@ -242,6 +250,52 @@ public class AMQChannel<T extends AMQPro
}
+ private boolean performGet(final AMQQueue queue,
+ final boolean acks)
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, MessageSource.ConsumerAccessRefused
+ {
+
+ final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
+
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, queue);
+ final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
+ {
+
+ public void recordMessageDelivery(final ConsumerImpl sub,
+ final MessageInstance entry,
+ final long deliveryTag)
+ {
+ addUnacknowledgedMessage(entry, deliveryTag, null);
+ }
+ };
+
+ ConsumerTarget_0_8 target;
+ EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES,
+ ConsumerImpl.Option.SEES_REQUEUES);
+ if (acks)
+ {
+
+ target = ConsumerTarget_0_8.createAckTarget(this,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createGetNoAckTarget(this,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ }
+
+ ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return getDeliveryMethod.hasDeliveredMessage();
+
+
+ }
+
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
@@ -250,7 +304,7 @@ public class AMQChannel<T extends AMQPro
@Override
public long getActivityTime()
{
- return _session.getLastReceivedTime();
+ return _connection.getLastReceivedTime();
}
});
_txnStarts.incrementAndGet();
@@ -324,27 +378,18 @@ public class AMQChannel<T extends AMQPro
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
{
- if (_currentMessage == null)
+ if (_logger.isDebugEnabled())
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ _logger.debug("Content header received on channel " + _channelId);
}
- else
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Content header received on channel " + _channelId);
- }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
- deliverCurrentMessageIfComplete();
- }
+ deliverCurrentMessageIfComplete();
}
private void deliverCurrentMessageIfComplete()
- throws AMQException
{
// check and deliver if header says body length is zero
if (_currentMessage.allContentReceived())
@@ -355,7 +400,7 @@ public class AMQChannel<T extends AMQPro
final MessageMetaData messageMetaData =
new MessageMetaData(_currentMessage.getMessagePublishInfo(),
_currentMessage.getContentHeader(),
- getProtocolSession().getLastReceivedTime());
+ getConnection().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
@@ -430,7 +475,7 @@ public class AMQChannel<T extends AMQPro
{
long bodySize = _currentMessage.getSize();
long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
- _session.registerMessageReceived(bodySize, timestamp);
+ _connection.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
}
@@ -443,13 +488,13 @@ public class AMQChannel<T extends AMQPro
* Pre-requisite: the current message is judged to have no destination queues.
*
* @throws AMQConnectionException if the message is mandatory close-on-no-route
- * @see AMQProtocolSession#isCloseWhenNoRoute()
+ * @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
- private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
+ private void handleUnroutableMessage(AMQMessage message)
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
- boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+ boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
if(_logger.isDebugEnabled())
{
@@ -458,29 +503,29 @@ public class AMQChannel<T extends AMQPro
description, mandatory, isTransactional(), closeOnNoRoute));
}
- if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
- {
- throw new AMQConnectionException(
- AMQConstant.NO_ROUTE,
- "No route for message " + currentMessageDescription(),
- 0, 0, // default class and method ids
- getProtocolSession().getProtocolVersion().getMajorVersion(),
- getProtocolSession().getProtocolVersion().getMinorVersion(),
- (Throwable) null);
- }
-
- if (mandatory || message.isImmediate())
+ if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
- _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message " + currentMessageDescription(), message));
+ _connection.closeConnection(AMQConstant.NO_ROUTE,
+ "No route for message " + currentMessageDescription(), _channelId);
}
else
{
- AMQShortString exchangeName = _currentMessage.getExchangeName();
- AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
+ if (mandatory || message.isImmediate())
+ {
+ _transaction.addPostTransactionAction(new WriteReturnAction(AMQConstant.NO_ROUTE,
+ "No Route for message "
+ + currentMessageDescription(),
+ message));
+ }
+ else
+ {
+ AMQShortString exchangeName = _currentMessage.getExchangeName();
+ AMQShortString routingKey = _currentMessage.getMessagePublishInfo().getRoutingKey();
- getVirtualHost().getEventLogger().message(
- ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
- routingKey == null ? null : routingKey.asString()));
+ getVirtualHost().getEventLogger().message(
+ ExchangeMessages.DISCARDMSG(exchangeName == null ? null : exchangeName.asString(),
+ routingKey == null ? null : routingKey.asString()));
+ }
}
}
@@ -499,13 +544,8 @@ public class AMQChannel<T extends AMQPro
: _currentMessage.getMessagePublishInfo().getRoutingKey().toString());
}
- public void publishContentBody(ContentBody contentBody) throws AMQException
+ public void publishContentBody(ContentBody contentBody)
{
- if (_currentMessage == null)
- {
- throw new AMQException("Received content body without previously receiving a Content Header");
- }
-
if (_logger.isDebugEnabled())
{
_logger.debug(debugIdentity() + " content body received on channel " + _channelId);
@@ -517,13 +557,6 @@ public class AMQChannel<T extends AMQPro
deliverCurrentMessageIfComplete();
}
- catch (AMQException e)
- {
- // we want to make sure we don't keep a reference to the message in the
- // event of an error
- _currentMessage = null;
- throw e;
- }
catch (RuntimeException e)
{
// we want to make sure we don't keep a reference to the message in the
@@ -566,9 +599,10 @@ public class AMQChannel<T extends AMQPro
*/
public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
- throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
- MessageSource.ConsumerAccessRefused
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer,
+ AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused, ConsumerTagInUseException
{
if (tag == null)
{
@@ -577,7 +611,7 @@ public class AMQChannel<T extends AMQPro
if (_tag2SubscriptionTargetMap.containsKey(tag))
{
- throw new AMQException("Consumer already exists with same tag: " + tag);
+ throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}
ConsumerTarget_0_8 target;
@@ -649,27 +683,11 @@ public class AMQChannel<T extends AMQPro
}
}
}
- catch (AccessControlException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (AMQInvalidArgumentException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ConsumerAccessRefused e)
+ catch (AccessControlException
+ | MessageSource.ExistingExclusiveConsumer
+ | MessageSource.ExistingConsumerPreventsExclusive
+ | AMQInvalidArgumentException
+ | MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
@@ -730,7 +748,7 @@ public class AMQChannel<T extends AMQPro
unsubscribeAllConsumers();
- for (Action<? super AMQChannel<T>> task : _taskList)
+ for (Action<? super AMQChannel> task : _taskList)
{
task.performAction(this);
}
@@ -897,9 +915,8 @@ public class AMQChannel<T extends AMQPro
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @throws AMQException When something goes wrong.
*/
- public void resend() throws AMQException
+ public void resend()
{
@@ -985,9 +1002,8 @@ public class AMQChannel<T extends AMQPro
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
*
- * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1084,22 +1100,13 @@ public class AMQChannel<T extends AMQPro
public boolean isSuspended()
{
- return _suspended.get() || _closing.get() || _session.isClosing();
+ return _suspended.get() || _closing.get() || _connection.isClosing();
}
- public void commit() throws AMQException
- {
- commit(null, false);
- }
-
- public void commit(final Runnable immediateAction, boolean async) throws AMQException
+ public void commit(final Runnable immediateAction, boolean async)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
if(async && _transaction instanceof LocalTransaction)
{
@@ -1132,17 +1139,8 @@ public class AMQChannel<T extends AMQPro
}
}
- public void rollback() throws AMQException
- {
- rollback(NULL_TASK);
- }
-
- public void rollback(Runnable postRollbackTask) throws AMQException
+ public void rollback(Runnable postRollbackTask)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
// stop all subscriptions
_rollingBack = true;
@@ -1200,7 +1198,7 @@ public class AMQChannel<T extends AMQPro
public String toString()
{
- return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
@@ -1219,9 +1217,9 @@ public class AMQChannel<T extends AMQPro
return _closing.get();
}
- public AMQProtocolSession getProtocolSession()
+ public AMQProtocolEngine getConnection()
{
- return _session;
+ return _connection;
}
public FlowCreditManager getCreditManager()
@@ -1261,13 +1259,9 @@ public class AMQChannel<T extends AMQPro
private AMQMessage createAMQMessage(IncomingMessage incomingMessage, StoredMessage<MessageMetaData> handle)
- throws AMQException
{
- AMQMessage message = new AMQMessage(handle, _session.getReference());
-
- final BasicContentHeaderProperties properties =
- incomingMessage.getContentHeader().getProperties();
+ AMQMessage message = new AMQMessage(handle, _connection.getReference());
return message;
}
@@ -1275,7 +1269,7 @@ public class AMQChannel<T extends AMQPro
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID = header.getProperties().getUserId();
- return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1286,14 +1280,14 @@ public class AMQChannel<T extends AMQPro
}
@Override
- public T getConnectionModel()
+ public AMQProtocolEngine getConnectionModel()
{
- return _session;
+ return _connection;
}
public String getClientID()
{
- return String.valueOf(_session.getContextKey());
+ return String.valueOf(_connection.getContextKey());
}
public LogSubject getLogSubject()
@@ -1308,13 +1302,13 @@ public class AMQChannel<T extends AMQPro
}
@Override
- public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void addDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.add(task);
}
@Override
- public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void removeDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.remove(task);
}
@@ -1324,6 +1318,46 @@ public class AMQChannel<T extends AMQPro
return _subject;
}
+ public boolean hasCurrentMessage()
+ {
+ return _currentMessage != null;
+ }
+
+ private class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _queue = queue;
+ }
+
+ @Override
+ public long deliverToClient(final ConsumerImpl sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag)
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ long size = _connection.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ AMQChannel.this.getChannelId(),
+ deliveryTag,
+ _queue.getQueueDepthMessages());
+
+ _deliveredMessage = true;
+ return size;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
+
private class ImmediateAction implements Action<MessageInstance>
{
@@ -1352,7 +1386,7 @@ public class AMQChannel<T extends AMQPro
public void postCommit()
{
final ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
+ _connection.getProtocolOutputConverter();
outputConverter.writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
@@ -1475,7 +1509,7 @@ public class AMQChannel<T extends AMQPro
public void postCommit()
{
AMQMessage message = _reference.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
message,
_channelId,
@@ -1544,7 +1578,7 @@ public class AMQChannel<T extends AMQPro
@Override
public Object getConnectionReference()
{
- return getProtocolSession().getReference();
+ return getConnection().getReference();
}
public int getUnacknowledgedMessageCount()
@@ -1554,9 +1588,9 @@ public class AMQChannel<T extends AMQPro
private void flow(boolean flow)
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
- _session.writeFrame(responseBody.generateFrame(_channelId));
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
}
@Override
@@ -1567,7 +1601,7 @@ public class AMQChannel<T extends AMQPro
public VirtualHostImpl getVirtualHost()
{
- return getProtocolSession().getVirtualHost();
+ return getConnection().getVirtualHost();
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -1581,11 +1615,11 @@ public class AMQChannel<T extends AMQPro
*/
private void closeConnection(String reason) throws AMQException
{
- Lock receivedLock = _session.getReceivedLock();
+ Lock receivedLock = _connection.getReceivedLock();
receivedLock.lock();
try
{
- _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.close(AMQConstant.RESOURCE_ERROR, reason);
}
finally
{
@@ -1593,7 +1627,7 @@ public class AMQChannel<T extends AMQPro
}
}
- public void deadLetter(long deliveryTag) throws AMQException
+ public void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
@@ -1816,4 +1850,1521 @@ public class AMQChannel<T extends AMQPro
return 0L;
}
}
+
+ @Override
+ public void receiveAccessRequest(final AMQShortString realm,
+ final boolean exclusive,
+ final boolean passive,
+ final boolean active, final boolean write, final boolean read)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] AccessRequest[" +" realm: " + realm +
+ " exclusive: " + exclusive +
+ " passive: " + passive +
+ " active: " + active +
+ " write: " + write + " read: " + read + " ]");
+ }
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ if (ProtocolVersion.v0_91.equals(_connection.getProtocolVersion()))
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "AccessRequest not present in AMQP versions other than 0-8, 0-9",
+ _channelId);
+ }
+ else
+ {
+ // We don't implement access control class, but to keep clients happy that expect it
+ // always use the "0" ticket.
+ AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ sync();
+ _connection.writeFrame(response.generateFrame(_channelId));
+ }
+ }
+
+ @Override
+ public void receiveBasicAck(final long deliveryTag, final boolean multiple)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicAck[" +" deliveryTag: " + deliveryTag + " multiple: " + multiple + " ]");
+ }
+
+ acknowledgeMessage(deliveryTag, multiple);
+ }
+
+ @Override
+ public void receiveBasicCancel(final AMQShortString consumerTag, final boolean nowait)
+ {
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicCancel[" +" consumerTag: " + consumerTag + " noWait: " + nowait + " ]");
+ }
+
+ unsubscribeConsumer(consumerTag);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ BasicCancelOkBody cancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+ sync();
+ _connection.writeFrame(cancelOkBody.generateFrame(_channelId));
+ }
+ }
+
+ @Override
+ public void receiveBasicConsume(final AMQShortString queue,
+ final AMQShortString consumerTag,
+ final boolean noLocal,
+ final boolean noAck,
+ final boolean exclusive, final boolean nowait, final FieldTable arguments)
+ {
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue +
+ " consumerTag: " + consumerTag +
+ " noLocal: " + noLocal +
+ " noAck: " + noAck +
+ " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
+ AMQShortString consumerTag1 = consumerTag;
+ VirtualHostImpl<?, ?, ?> vHost = _connection.getVirtualHost();
+ sync();
+ String queueName = queue == null ? null : queue.asString();
+
+ MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName);
+ final Collection<MessageSource> sources = new HashSet<>();
+ if (queue1 != null)
+ {
+ sources.add(queue1);
+ }
+ else if (vHost.getContextValue(Boolean.class, "qpid.enableMultiQueueConsumers")
+ && arguments != null
+ && arguments.get("x-multiqueue") instanceof Collection)
+ {
+ for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
+ {
+ String sourceName = String.valueOf(object);
+ sourceName = sourceName.trim();
+ if (sourceName.length() != 0)
+ {
+ MessageSource source = vHost.getMessageSource(sourceName);
+ if (source == null)
+ {
+ sources.clear();
+ break;
+ }
+ else
+ {
+ sources.add(source);
+ }
+ }
+ }
+ queueName = arguments.get("x-multiqueue").toString();
+ }
+
+ if (sources.isEmpty())
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No queue for '" + queueName + "'");
+ }
+ if (queueName != null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'");
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
+ }
+ }
+ else
+ {
+ try
+ {
+ consumerTag1 = consumeFromSource(consumerTag1,
+ sources,
+ !noAck,
+ arguments,
+ exclusive,
+ noLocal);
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+ }
+ }
+ catch (ConsumerTagInUseException cte)
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Non-unique consumer tag, '" + consumerTag1
+ + "'", _channelId);
+ }
+ catch (AMQInvalidArgumentException ise)
+ {
+ _connection.closeConnection(AMQConstant.ARGUMENT_INVALID, ise.getMessage(), _channelId);
+
+
+ }
+ catch (AMQQueue.ExistingExclusiveConsumer e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " as it already has an existing exclusive consumer", _channelId);
+
+ }
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " exclusively as it already has a consumer", _channelId);
+
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, "Cannot subscribe to queue "
+ + queue1.getName()
+ + " permission denied", _channelId);
+
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED,
+ "Cannot subscribe to queue "
+ + queue1.getName()
+ + " as it already has an incompatible exclusivity policy", _channelId);
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveBasicGet(final AMQShortString queueName, final boolean noAck)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicGet[" +" queue: " + queueName + " noAck: " + noAck + " ]");
+ }
+
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+ sync();
+ AMQQueue queue = queueName == null ? getDefaultQueue() : vHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ _logger.info("No queue for '" + queueName + "'");
+ if (queueName != null)
+ {
+ _connection.closeConnection(AMQConstant.NOT_FOUND, "No such queue, '" + queueName + "'", _channelId);
+
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "No queue name provided, no default queue defined.", _channelId);
+
+ }
+ }
+ else
+ {
+
+ try
+ {
+ if (!performGet(queue, !noAck))
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
+
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), _channelId);
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue has an exclusive consumer", _channelId);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ _connection.closeConnection(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker", _channelId);
+ }
+ catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue has an incompatible exclusivity policy", _channelId);
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicPublish(final AMQShortString exchangeName,
+ final AMQShortString routingKey,
+ final boolean mandatory,
+ final boolean immediate)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicPublish[" +" exchange: " + exchangeName +
+ " routingKey: " + routingKey +
+ " mandatory: " + mandatory +
+ " immediate: " + immediate + " ]");
+ }
+
+ VirtualHostImpl vHost = _connection.getVirtualHost();
+
+ MessageDestination destination;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ destination = vHost.getDefaultDestination();
+ }
+ else
+ {
+ destination = vHost.getMessageDestination(exchangeName.toString());
+ }
+
+ // if the exchange does not exist we raise a channel exception
+ if (destination == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange name: " + exchangeName);
+ }
+ else
+ {
+
+ MessagePublishInfo info = new MessagePublishInfo(exchangeName,
+ immediate,
+ mandatory,
+ routingKey);
+
+ try
+ {
+ setPublishFrame(info, destination);
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+
+ @Override
+ public void receiveBasicQos(final long prefetchSize, final int prefetchCount, final boolean global)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicQos[" +" prefetchSize: " + prefetchSize + " prefetchCount: " + prefetchCount + " global: " + global + " ]");
+ }
+
+ sync();
+ setCredit(prefetchSize, prefetchCount);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveBasicRecover(final boolean requeue, final boolean sync)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicRecover[" + " requeue: " + requeue + " sync: " + sync + " ]");
+ }
+
+ resend();
+
+ if (sync)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ sync();
+ _connection.writeFrame(recoverOk.generateFrame(getChannelId()));
+
+ }
+
+ }
+
+ @Override
+ public void receiveBasicReject(final long deliveryTag, final boolean requeue)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] BasicReject[" +" deliveryTag: " + deliveryTag + " requeue: " + requeue + " ]");
+ }
+
+ MessageInstance message = getUnacknowledgedMessageMap().get(deliveryTag);
+
+ if (message == null)
+ {
+ _logger.warn("Dropping reject request as message is null for tag:" + deliveryTag);
+ }
+ else
+ {
+
+ if (message.getMessage() == null)
+ {
+ _logger.warn("Message has already been purged, unable to Reject.");
+ }
+ else
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting: DT:" + deliveryTag
+ + "-" + message.getMessage() +
+ ": Requeue:" + requeue
+ +
+ " on channel:" + debugIdentity());
+ }
+
+ if (requeue)
+ {
+ //this requeue represents a message rejected from the pre-dispatch queue
+ //therefore we need to amend the delivery counter.
+ message.decrementDeliveryCount();
+
+ requeue(deliveryTag);
+ }
+ else
+ {
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: "
+ + maxDeliveryCountEnabled
+ + " deliveryTag "
+ + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: "
+ + deliveredTooManyTimes
+ + " deliveryTag "
+ + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ deadLetter(deliveryTag);
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ requeue(deliveryTag);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveChannelClose(final int replyCode,
+ final AMQShortString replyText,
+ final int classId,
+ final int methodId)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelClose[" +" replyCode: " + replyCode + " replyText: " + replyText + " classId: " + classId + " methodId: " + methodId + " ]");
+ }
+
+
+ sync();
+ _connection.closeChannel(this);
+
+ _connection.writeFrame(new AMQFrame(getChannelId(),
+ _connection.getMethodRegistry().createChannelCloseOkBody()));
+ }
+
+ @Override
+ public void receiveChannelCloseOk()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelCloseOk");
+ }
+
+ _connection.closeChannelOk(getChannelId());
+ }
+
+ @Override
+ public void receiveMessageContent(final byte[] data)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] MessageContent[" +" data: " + hex(data,_connection.getBinaryDataLimit()) + " ] ");
+ }
+
+ if(hasCurrentMessage())
+ {
+ publishContentBody(new ContentBody(data));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public void receiveMessageHeader(final BasicContentHeaderProperties properties, final long bodySize)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] MessageHeader[ properties: {" + properties + "} bodySize: " + bodySize + " ]");
+ }
+
+ if(hasCurrentMessage())
+ {
+ publishContentHeader(new ContentHeaderBody(properties, bodySize));
+ }
+ else
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID,
+ "Attempt to send a content header without first sending a publish frame",
+ _channelId);
+ }
+ }
+
+ @Override
+ public boolean ignoreAllButCloseOk()
+ {
+ return _connection.ignoreAllButCloseOk() || _connection.channelAwaitingClosure(_channelId);
+ }
+
+ @Override
+ public void receiveChannelFlow(final boolean active)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelFlow[" +" active: " + active + " ]");
+ }
+
+
+ sync();
+ setSuspended(!active);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(active);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveChannelFlowOk(final boolean active)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ChannelFlowOk[" +" active: " + active + " ]");
+ }
+
+ // TODO - should we do anything here?
+ }
+
+ @Override
+ public void receiveExchangeBound(final AMQShortString exchangeName,
+ final AMQShortString routingKey,
+ final AMQShortString queueName)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeBound[" +" exchange: " + exchangeName + " routingKey: " +
+ routingKey + " queue: " + queueName + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+
+ sync();
+
+ int replyCode;
+ String replyText;
+
+ if (isDefaultExchange(exchangeName))
+ {
+ if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueues().isEmpty()
+ ? ExchangeBoundOkBody.NO_BINDINGS
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ }
+ }
+ else
+ {
+ if (queueName == null)
+ {
+ replyCode = virtualHost.getQueue(routingKey.toString()) == null
+ ? ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK
+ : ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ replyCode = queueName.equals(routingKey)
+ ? ExchangeBoundOkBody.OK
+ : ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = null;
+ }
+ }
+ }
+ }
+ else
+ {
+ ExchangeImpl exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+
+ replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
+ replyText = "Exchange '" + exchangeName + "' not found";
+ }
+ else if (routingKey == null)
+ {
+ if (queueName == null)
+ {
+ if (exchange.hasBindings())
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_BINDINGS;
+ replyText = null;
+ }
+ }
+ else
+ {
+
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ if (exchange.isBound(queue))
+ {
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_BOUND;
+ replyText = "Queue '"
+ + queueName
+ + "' not bound to exchange '"
+ + exchangeName
+ + "'";
+ }
+ }
+ }
+ }
+ else if (queueName != null)
+ {
+ AMQQueue queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ replyCode = ExchangeBoundOkBody.QUEUE_NOT_FOUND;
+ replyText = "Queue '" + queueName + "' not found";
+ }
+ else
+ {
+ String bindingKey = routingKey == null ? null : routingKey.asString();
+ if (exchange.isBound(bindingKey, queue))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.SPECIFIC_QUEUE_NOT_BOUND_WITH_RK;
+ replyText = "Queue '" + queueName + "' not bound with routing key '" +
+ routingKey + "' to exchange '" + exchangeName + "'";
+
+ }
+ }
+ }
+ else
+ {
+ if (exchange.isBound(routingKey == null ? "" : routingKey.asString()))
+ {
+
+ replyCode = ExchangeBoundOkBody.OK;
+ replyText = null;
+ }
+ else
+ {
+ replyCode = ExchangeBoundOkBody.NO_QUEUE_BOUND_WITH_RK;
+ replyText =
+ "No queue bound with routing key '" + routingKey + "' to exchange '" + exchangeName + "'";
+ }
+ }
+ }
+
+ ExchangeBoundOkBody exchangeBoundOkBody =
+ methodRegistry.createExchangeBoundOkBody(replyCode, AMQShortString.validValueOf(replyText));
+
+ _connection.writeFrame(exchangeBoundOkBody.generateFrame(getChannelId()));
+
+ }
+
+ @Override
+ public void receiveExchangeDeclare(final AMQShortString exchangeName,
+ final AMQShortString type,
+ final boolean passive,
+ final boolean durable,
+ final boolean autoDelete,
+ final boolean internal,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName +
+ " type: " + type +
+ " passive: " + passive +
+ " durable: " + durable +
+ " autoDelete: " + autoDelete +
+ " internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
+ ExchangeImpl exchange;
+ VirtualHostImpl<?, ?, ?> virtualHost = _connection.getVirtualHost();
+ if (isDefaultExchange(exchangeName))
+ {
+ if (!new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ + " of type "
+ + ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ + " to " + type + ".", getChannelId());
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ if (passive)
+ {
+ exchange = virtualHost.getExchange(exchangeName.toString());
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ }
+ else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.asString()))
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ +
+ exchangeName
+ + " of type "
+ + exchange.getType()
+ + " to "
+ + type
+ + ".", getChannelId());
+ }
+ else if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+
+ }
+ else
+ {
+ try
+ {
+ String name = exchangeName == null ? null : exchangeName.intern().toString();
+ String typeString = type == null ? null : type.intern().toString();
+
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ if (arguments != null)
+ {
+ attributes.putAll(FieldTable.convertToMap(arguments));
+ }
+ attributes.put(Exchange.ID, null);
+ attributes.put(Exchange.NAME, name);
+ attributes.put(Exchange.TYPE, typeString);
+ attributes.put(Exchange.DURABLE, durable);
+ attributes.put(Exchange.LIFETIME_POLICY,
+ autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
+ if (!attributes.containsKey(Exchange.ALTERNATE_EXCHANGE))
+ {
+ attributes.put(Exchange.ALTERNATE_EXCHANGE, null);
+ }
+ exchange = virtualHost.createExchange(attributes);
+
+ if (!nowait)
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(
+ getChannelId()));
+ }
+
+ }
+ catch (ReservedExchangeNameException e)
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Attempt to declare exchange: " + exchangeName +
+ " which begins with reserved prefix.", getChannelId());
+
+
+ }
+ catch (ExchangeExistsException e)
+ {
+ exchange = e.getExistingExchange();
+ if (!new AMQShortString(exchange.getType()).equals(type))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + type + ".", getChannelId());
+
+ }
+ }
+ catch (NoFactoryForTypeException e)
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"
+ + e.getType()
+ + "' for exchange '"
+ + exchangeName
+ + "'", getChannelId());
+
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ catch (UnknownConfiguredObjectException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ final String message = "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId());
+ _connection.closeConnection(AMQConstant.NOT_FOUND, message, getChannelId());
+
+ }
+ catch (IllegalArgumentException e)
+ {
+ _connection.closeConnection(AMQConstant.COMMAND_INVALID, "Error creating exchange '"
+ + exchangeName
+ + "': "
+ + e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void receiveExchangeDelete(final AMQShortString exchangeStr, final boolean ifUnused, final boolean nowait)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] ExchangeDelete[" +" exchange: " + exchangeStr + " ifUnused: " + ifUnused + " nowait: " + nowait + " ]");
+ }
+
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ sync();
+ try
+ {
+
+ if (isDefaultExchange(exchangeStr))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Default Exchange cannot be deleted", getChannelId());
+
+ }
+
+ else
+ {
+ final String exchangeName = exchangeStr.toString();
+
+ final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
+ if (exchange == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such exchange: " + exchangeStr);
+ }
+ else
+ {
+ virtualHost.removeExchange(exchange, !ifUnused);
+
+ ExchangeDeleteOkBody responseBody = _connection.getMethodRegistry().createExchangeDeleteOkBody();
+
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ }
+ }
+ catch (ExchangeIsAlternateException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ }
+ catch (RequiredExchangeException e)
+ {
+ closeChannel(AMQConstant.NOT_ALLOWED,
+ "Exchange '" + exchangeStr + "' cannot be deleted");
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+ }
+
+ @Override
+ public void receiveQueueBind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ AMQShortString routingKey,
+ final boolean nowait,
+ final FieldTable argumentsTable)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueBind[" +" queue: " + queueName +
+ " exchange: " + exchange +
+ " bindingKey: " + routingKey +
+ " nowait: " + nowait + " arguments: " + argumentsTable + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue<?> queue;
+ if (queueName == null)
+ {
+
+ queue = getDefaultQueue();
+
+ if (queue != null)
+ {
+ if (routingKey == null)
+ {
+ routingKey = AMQShortString.valueOf(queue.getName());
+ }
+ else
+ {
+ routingKey = routingKey.intern();
+ }
+ }
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey.intern();
+ }
+
+ if (queue == null)
+ {
+ String message = queueName == null
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Cannot bind the queue " + queueName + " to the default exchange", getChannelId());
+
+ }
+ else
+ {
+
+ final String exchangeName = exchange.toString();
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchangeName);
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND,
+ "Exchange " + exchangeName + " does not exist.");
+ }
+ else
+ {
+
+ try
+ {
+
+ Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);
+ String bindingKey = String.valueOf(routingKey);
+
+ if (!exch.isBound(bindingKey, arguments, queue))
+ {
+
+ if (!exch.addBinding(bindingKey, queue, arguments)
+ && ExchangeDefaults.TOPIC_EXCHANGE_CLASS.equals(
+ exch.getType()))
+ {
+ exch.replaceBinding(bindingKey, queue, arguments);
+ }
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue "
+ + queue
+ + " to exchange "
+ + exch
+ + " with routing key "
+ + routingKey);
+ }
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueueDeclare(final AMQShortString queueStr,
+ final boolean passive,
+ final boolean durable,
+ final boolean exclusive,
+ final boolean autoDelete,
+ final boolean nowait,
+ final FieldTable arguments)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr +
+ " passive: " + passive +
+ " durable: " + durable +
+ " exclusive: " + exclusive +
+ " autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+ final AMQShortString queueName;
+
+ // if we aren't given a queue name, we create one which we return to the client
+ if ((queueStr == null) || (queueStr.length() == 0))
+ {
+ queueName = new AMQShortString("tmp_" + UUID.randomUUID());
+ }
+ else
+ {
+ queueName = queueStr.intern();
+ }
+
+ AMQQueue queue;
+
+ //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
+
+
+ if (passive)
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND,
+ "Queue: "
+ + queueName
+ + " not found on VirtualHost("
+ + virtualHost
+ + ").");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+ }
+ else
+ {
+ //set this as the default queue on the channel:
+ setDefaultQueue(queue);
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ }
+ }
+ else
+ {
+
+ try
+ {
+ Map<String, Object> attributes =
+ QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(arguments));
+ final String queueNameString = AMQShortString.toString(queueName);
+ attributes.put(Queue.NAME, queueNameString);
+ attributes.put(Queue.ID, UUID.randomUUID());
+ attributes.put(Queue.DURABLE, durable);
+
+ LifetimePolicy lifetimePolicy;
+ ExclusivityPolicy exclusivityPolicy;
+
+ if (exclusive)
+ {
+ lifetimePolicy = autoDelete
+ ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
+ : durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
+ exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
+ }
+ else
+ {
+ lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
+ exclusivityPolicy = ExclusivityPolicy.NONE;
+ }
+
+ attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
+ attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
+
+
+ queue = virtualHost.createQueue(attributes);
+
+ setDefaultQueue(queue);
+
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ catch (QueueExistsException qe)
+ {
+
+ queue = qe.getExistingQueue();
+
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+
+ }
+ else if (queue.isExclusive() != exclusive)
+ {
+
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different exclusivity (was: "
+ + queue.isExclusive()
+ + " requested "
+ + exclusive
+ + ")");
+ }
+ else if ((autoDelete
+ && queue.getLifetimePolicy() != LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS)
+ || (!autoDelete && queue.getLifetimePolicy() != ((exclusive
+ && !durable)
+ ? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
+ : LifetimePolicy.PERMANENT)))
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different lifetime policy (was: "
+ + queue.getLifetimePolicy()
+ + " requested autodelete: "
+ + autoDelete
+ + ")");
+ }
+ else if (queue.isDurable() != durable)
+ {
+ closeChannel(AMQConstant.ALREADY_EXISTS,
+ "Cannot re-declare queue '"
+ + queue.getName()
+ + "' with different durability (was: "
+ + queue.isDurable()
+ + " requested "
+ + durable
+ + ")");
+ }
+ else
+ {
+ setDefaultQueue(queue);
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getQueueDepthMessages(),
+ queue.getConsumerCount());
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ _logger.info("Queue " + queueName + " declared successfully");
+ }
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueDelete(final AMQShortString queueName,
+ final boolean ifUnused,
+ final boolean ifEmpty,
+ final boolean nowait)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueDelete[" +" queue: " + queueName + " ifUnused: " + ifUnused + " ifEmpty: " + ifEmpty + " nowait: " + nowait + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ sync();
+ AMQQueue queue;
+ if (queueName == null)
+ {
+
+ //get the default queue on the channel:
+ queue = getDefaultQueue();
+ }
+ else
+ {
+ queue = virtualHost.getQueue(queueName.toString());
+ }
+
+ if (queue == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+
+ }
+ else
+ {
+ if (ifEmpty && !queue.isEmpty())
+ {
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is not empty.");
+ }
+ else if (ifUnused && !queue.isUnused())
+ {
+ // TODO - Error code
+ closeChannel(AMQConstant.IN_USE, "Queue: " + queueName + " is still used.");
+ }
+ else
+ {
+ if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Queue "
+ + queue.getName()
+ + " is exclusive, but not created on this Connection.", getChannelId());
+
+ }
+ else
+ {
+ try
+ {
+ int purged = virtualHost.removeQueue(queue);
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void receiveQueuePurge(final AMQShortString queueName, final boolean nowait)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueuePurge[" +" queue: " + queueName + " nowait: " + nowait + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+ AMQQueue queue = null;
+ if (queueName == null && (queue = getDefaultQueue()) == null)
+ {
+
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "No queue specified.", getChannelId());
+ }
+ else if ((queueName != null) && (queue = virtualHost.getQueue(queueName.toString())) == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Queue " + queueName + " does not exist.");
+ }
+ else if (!queue.verifySessionAccess(this))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED,
+ "Queue is exclusive, but not created on this Connection.", getChannelId());
+ }
+ else
+ {
+ try
+ {
+ long purged = queue.clearQueue();
+ if (!nowait)
+ {
+ sync();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+
+ }
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveQueueUnbind(final AMQShortString queueName,
+ final AMQShortString exchange,
+ final AMQShortString routingKey,
+ final FieldTable arguments)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] QueueUnbind[" +" queue: " + queueName +
+ " exchange: " + exchange +
+ " bindingKey: " + routingKey +
+ " arguments: " + arguments + " ]");
+ }
+
+ VirtualHostImpl virtualHost = _connection.getVirtualHost();
+
+
+ final boolean useDefaultQueue = queueName == null;
+ final AMQQueue queue = useDefaultQueue
+ ? getDefaultQueue()
+ : virtualHost.getQueue(queueName.toString());
+
+
+ if (queue == null)
+ {
+ String message = useDefaultQueue
+ ? "No default queue defined on channel and queue was null"
+ : "Queue " + queueName + " does not exist.";
+ closeChannel(AMQConstant.NOT_FOUND, message);
+ }
+ else if (isDefaultExchange(exchange))
+ {
+ _connection.closeConnection(AMQConstant.NOT_ALLOWED, "Cannot unbind the queue "
+ + queue.getName()
+ + " from the default exchange", getChannelId());
+
+ }
+ else
+ {
+
+ final ExchangeImpl exch = virtualHost.getExchange(exchange.toString());
+
+ if (exch == null)
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "Exchange " + exchange + " does not exist.");
+ }
+ else if (!exch.hasBinding(String.valueOf(routingKey), queue))
+ {
+ closeChannel(AMQConstant.NOT_FOUND, "No such binding");
+ }
+ else
+ {
+ try
+ {
+ exch.deleteBinding(String.valueOf(routingKey), queue);
+
+ final AMQMethodBody responseBody = _connection.getMethodRegistry().createQueueUnbindOkBody();
+ sync();
+ _connection.writeFrame(responseBody.generateFrame(getChannelId()));
+ }
+ catch (AccessControlException e)
+ {
+ _connection.closeConnection(AMQConstant.ACCESS_REFUSED, e.getMessage(), getChannelId());
+
+ }
+ }
+
+ }
+ }
+
+ @Override
+ public void receiveTxSelect()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxSelect");
+ }
+
+ setLocalTransactional();
+
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+
+ }
+
+ @Override
+ public void receiveTxCommit()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxCommit");
+ }
+
+
+ if (!isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID,
+ "Fatal error: commit called on non-transactional channel");
+ }
+ commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
+ }
+ }, true);
+
+ }
+
+ @Override
+ public void receiveTxRollback()
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("RECV[" + _channelId + "] TxRollback");
+ }
+
+ if (!isTransactional())
+ {
+ closeChannel(AMQConstant.COMMAND_INVALID,
+ "Fatal error: rollback called on non-transactional channel");
+ }
+
+ final MethodRegistry methodRegistry = _connection.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
[... 31 lines stripped ...]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org