You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC
svn commit: r1567616 [9/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/
qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/
qpid/cpp/bindings/qpid/ruby/ qp...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Wed Feb 12 13:27:51 2014
@@ -21,19 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,10 @@ import org.apache.qpid.server.Transactio
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.FilterManagerFactory;
+import org.apache.qpid.server.filter.FilterSupport;
+import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -66,25 +59,28 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -122,7 +118,7 @@ public class AMQChannel implements AMQSe
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+ private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
private final MessageStore _messageStore;
@@ -155,7 +151,7 @@ public class AMQChannel implements AMQSe
private volatile boolean _rollingBack;
private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
- private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
private long _createTime = System.currentTimeMillis();
@@ -266,7 +262,7 @@ public class AMQChannel implements AMQSe
return _channelId;
}
- public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+ public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -275,7 +271,7 @@ public class AMQChannel implements AMQSe
throw new AMQSecurityException("Permission denied: " + e.getName());
}
_currentMessage = new IncomingMessage(info);
- _currentMessage.setExchange(e);
+ _currentMessage.setMessageDestination(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -360,7 +356,7 @@ public class AMQChannel implements AMQSe
}
};
- int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
immediate ? _immediateAction : _capacityCheckAction);
if(enqueues == 0)
{
@@ -497,41 +493,64 @@ public class AMQChannel implements AMQSe
}
- public Subscription getSubscription(AMQShortString subscription)
+ public Consumer getSubscription(AMQShortString tag)
{
- return _tag2SubscriptionMap.get(subscription);
+ final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getConsumer();
}
/**
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
+ *
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
+ * @param source the queue to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ FieldTable filters, boolean exclusive, boolean noLocal) throws AMQException
{
if (tag == null)
{
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_tag2SubscriptionMap.containsKey(tag))
+ if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- Subscription subscription =
- SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+
+ if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ {
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ }
+ else if(acks)
+ {
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+
+ if(exclusive)
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
@@ -539,20 +558,34 @@ public class AMQChannel implements AMQSe
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _tag2SubscriptionMap.put(tag, subscription);
+ _tag2SubscriptionTargetMap.put(tag, target);
try
{
- queue.registerSubscription(subscription, exclusive);
+ FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(filters));
+ if(noLocal)
+ {
+ if(filterManager == null)
+ {
+ filterManager = new SimpleFilterManager();
+ }
+ filterManager.add(new FilterSupport.NoLocalFilter(source));
+ }
+ Consumer sub =
+ source.addConsumer(target,
+ filterManager,
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
}
catch (AMQException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
catch (RuntimeException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
return tag;
@@ -567,18 +600,11 @@ public class AMQChannel implements AMQSe
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Consumer sub = target == null ? null : target.getConsumer();
if (sub != null)
{
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
return true;
}
else
@@ -633,7 +659,7 @@ public class AMQChannel implements AMQSe
{
if (_logger.isInfoEnabled())
{
- if (!_tag2SubscriptionMap.isEmpty())
+ if (!_tag2SubscriptionTargetMap.isEmpty())
{
_logger.info("Unsubscribing all consumers on channel " + toString());
}
@@ -643,28 +669,21 @@ public class AMQChannel implements AMQSe
}
}
- for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+ for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue();
+ Consumer sub = me.getValue().getConsumer();
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+
+ sub.close();
}
- _tag2SubscriptionMap.clear();
+ _tag2SubscriptionTargetMap.clear();
}
/**
@@ -673,24 +692,15 @@ public class AMQChannel implements AMQSe
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param subscription The consumer that is to acknowledge this message.
+ * @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
{
if (_logger.isDebugEnabled())
{
- if (entry.getQueue() == null)
- {
- _logger.debug("Adding unacked message with a null queue:" + entry);
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + subscription);
- }
- }
+ + ") for " + consumer + " on " + entry.getOwningResource().getName());
+
}
_unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -713,7 +723,7 @@ public class AMQChannel implements AMQSe
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
if (!messagesToBeDelivered.isEmpty())
{
@@ -724,21 +734,13 @@ public class AMQChannel implements AMQSe
}
- for (QueueEntry unacked : messagesToBeDelivered)
+ for (MessageInstance unacked : messagesToBeDelivered)
{
- if (!unacked.isQueueDeleted())
- {
- // Mark message redelivered
- unacked.setRedelivered();
-
- // Ensure message is released for redelivery
- unacked.release();
+ // Mark message redelivered
+ unacked.setRedelivered();
- }
- else
- {
- unacked.delete();
- }
+ // Ensure message is released for redelivery
+ unacked.release();
}
}
@@ -752,7 +754,7 @@ public class AMQChannel implements AMQSe
*/
public void requeue(long deliveryTag) throws AMQException
{
- QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
@@ -760,20 +762,8 @@ public class AMQChannel implements AMQSe
unacked.setRedelivered();
// Ensure message is released for redelivery
- if (!unacked.isQueueDeleted())
- {
+ unacked.release();
- // Ensure message is released for redelivery
- unacked.release();
-
- }
- else
- {
- _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
-
- unacked.delete();
- }
}
else
{
@@ -786,10 +776,10 @@ public class AMQChannel implements AMQSe
public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
return maximumDeliveryCount > 0;
}
@@ -798,10 +788,10 @@ public class AMQChannel implements AMQSe
public boolean isDeliveredTooManyTimes(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
final int numDeliveries = queueEntry.getDeliveryCount();
return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
}
@@ -812,16 +802,14 @@ public class AMQChannel implements AMQSe
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @param requeue Are the messages to be requeued or dropped.
- *
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue) throws AMQException
+ public void resend() throws AMQException
{
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
if (_logger.isDebugEnabled())
{
@@ -833,9 +821,8 @@ public class AMQChannel implements AMQSe
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
msgToRequeue,
- msgToResend,
- requeue,
- _messageStore));
+ msgToResend
+ ));
// Process Messages to Resend
@@ -851,39 +838,20 @@ public class AMQChannel implements AMQSe
}
}
- for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
message.decrementDeliveryCount();
- AMQQueue queue = message.getQueue();
-
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
message.setRedelivered();
- Subscription sub = message.getDeliveredSubscription();
-
- if (sub != null)
- {
-
- if(!queue.resend(message,sub))
- {
- msgToRequeue.put(deliveryTag, message);
- }
- }
- else
+ if (!message.resend())
{
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
- + ")to prevent loss");
- }
- // move this message to requeue
msgToRequeue.put(deliveryTag, message);
}
} // for all messages
@@ -898,9 +866,9 @@ public class AMQChannel implements AMQSe
}
// Process Messages to Requeue at the front of the queue
- for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
@@ -926,11 +894,11 @@ public class AMQChannel implements AMQSe
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+ Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
}
- private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+ private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
{
return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -976,9 +944,9 @@ public class AMQChannel implements AMQSe
if (wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getConsumer().externalStateChange();
}
}
@@ -992,15 +960,15 @@ public class AMQChannel implements AMQSe
if (!wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSendLock();
+ s.getConsumer().getSendLock();
}
finally
{
- s.releaseSendLock();
+ s.getConsumer().releaseSendLock();
}
}
}
@@ -1077,10 +1045,10 @@ public class AMQChannel implements AMQSe
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSendLock();
- sub.releaseSendLock();
+ sub.getConsumer().getSendLock();
+ sub.getConsumer().releaseSendLock();
}
try
@@ -1098,16 +1066,16 @@ public class AMQChannel implements AMQSe
postRollbackTask.run();
- for(QueueEntry entry : _resendList)
+ for(MessageInstance entry : _resendList)
{
- Subscription sub = entry.getDeliveredSubscription();
+ Consumer sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
}
else
{
- sub.getQueue().resend(entry, sub);
+ entry.resend();
}
}
_resendList.clear();
@@ -1115,9 +1083,9 @@ public class AMQChannel implements AMQSe
if(requiresSuspend)
{
_suspended.set(false);
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getQueue().deliverAsync(sub);
+ sub.getConsumer().externalStateChange();
}
}
@@ -1173,7 +1141,7 @@ public class AMQChannel implements AMQSe
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1233,140 +1201,97 @@ public class AMQChannel implements AMQSe
return getId().compareTo(o.getId());
}
- private class MessageDeliveryAction implements ServerTransaction.Action
- {
- private final MessageReference<AMQMessage> _reference;
- private List<? extends BaseQueue> _destinationQueues;
-
- public MessageDeliveryAction(AMQMessage currentMessage,
- List<? extends BaseQueue> destinationQueues)
- {
- _reference = currentMessage.newReference();
- _destinationQueues = destinationQueues;
- }
- public void postCommit()
- {
- try
- {
- AMQMessage message = _reference.getMessage();
- final boolean immediate = message.isImmediate();
-
- for(int i = 0; i < _destinationQueues.size(); i++)
- {
- BaseQueue queue = _destinationQueues.get(i);
-
- BaseQueue.PostEnqueueAction action;
-
- if(immediate)
- {
- action = new ImmediateAction();
- }
- else
- {
- action = null;
- }
-
- queue.enqueue(message, isTransactional(), action);
-
- if(queue instanceof AMQQueue)
- {
- ((AMQQueue)queue).checkCapacity(AMQChannel.this);
- }
-
- }
-
- message.getStoredMessage().flushToStore();
- _reference.release();
- }
- catch (AMQException e)
- {
- // TODO
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- // Maybe keep track of entries that were created and then delete them here in case of failure
- // to in memory enqueue
- _reference.release();
- }
-
-
- }
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
public ImmediateAction()
{
}
- public void onEnqueue(QueueEntry entry)
+ public void performAction(MessageInstance<?,C> entry)
{
- AMQQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
- {
- @Override
- public void postCommit()
+ MessageReference ref = message.newReference();
+ try
+ {
+ entry.delete();
+ txn.dequeue(queue, message,
+ new ServerTransaction.Action()
{
- try
+ @Override
+ public void postCommit()
{
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
+
+ @Override
+ public void onRollback()
{
- throw new RuntimeException(e);
+
}
- super.postCommit();
}
- }
- );
- txn.commit();
+ );
+ txn.commit();
+ }
+ finally
+ {
+ ref.release();
+ }
}
else
{
- queue.checkCapacity(AMQChannel.this);
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
}
- private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<?,C>>
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final MessageInstance<?,C> entry)
{
- AMQQueue queue = entry.getQueue();
- queue.checkCapacity(AMQChannel.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
private class MessageAcknowledgeAction implements ServerTransaction.Action
{
- private final Collection<QueueEntry> _ackedMessages;
+ private final Collection<MessageInstance> _ackedMessages;
- public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+ public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
{
_ackedMessages = ackedMessages;
}
@@ -1375,7 +1300,7 @@ public class AMQChannel implements AMQSe
{
try
{
- for(QueueEntry entry : _ackedMessages)
+ for(MessageInstance entry : _ackedMessages)
{
entry.delete();
}
@@ -1398,10 +1323,10 @@ public class AMQChannel implements AMQSe
{
try
{
- for(QueueEntry entry : _ackedMessages)
- {
- entry.release();
- }
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.release();
+ }
}
finally
{
@@ -1566,7 +1491,7 @@ public class AMQChannel implements AMQSe
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+ final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
@@ -1575,36 +1500,42 @@ public class AMQChannel implements AMQSe
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
+ final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
- int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void onEnqueue(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
if(requeues == 0)
{
- final AMQQueue queue = rejectedQueueEntry.getQueue();
- final Exchange altExchange = queue.getAlternateExchange();
-
- if (altExchange == null)
+ final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ final AMQQueue queue = (AMQQueue) owningResource;
- }
- else
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- _actor.message(_logSubject,
- ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ final Exchange altExchange = queue.getAlternateExchange();
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
@@ -1665,6 +1596,6 @@ public class AMQChannel implements AMQSe
@Override
public int getConsumerCount()
{
- return _tag2SubscriptionMap.size();
+ return _tag2SubscriptionTargetMap.size();
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Wed Feb 12 13:27:51 2014
@@ -94,8 +94,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implement
}
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message,
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implement
props,
_channelId,
deliveryTag,
- ((SubscriptionImpl)sub).getConsumerTag());
+ new AMQShortString(sub.getName()));
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Wed Feb 12 13:27:51 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Wed Feb 12 13:27:51 2014
@@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -35,34 +32,28 @@ public class ExtractResendAndRequeue imp
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
- private final Map<Long, QueueEntry> _msgToRequeue;
- private final Map<Long, QueueEntry> _msgToResend;
- private final boolean _requeueIfUnableToResend;
+ private final Map<Long, MessageInstance> _msgToRequeue;
+ private final Map<Long, MessageInstance> _msgToResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
- Map<Long, QueueEntry> msgToRequeue,
- Map<Long, QueueEntry> msgToResend,
- boolean requeueIfUnableToResend,
- MessageStore txnLog)
+ Map<Long, MessageInstance> msgToRequeue,
+ Map<Long, MessageInstance> msgToResend)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
_msgToResend = msgToResend;
- _requeueIfUnableToResend = requeueIfUnableToResend;
- _transactionLog = txnLog;
}
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
{
message.setRedelivered();
- final Subscription subscription = message.getDeliveredSubscription();
- if (subscription != null)
+ final Consumer consumer = message.getDeliveredConsumer();
+ if (consumer != null)
{
// Consumer exists
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
_msgToResend.put(deliveryTag, message);
}
@@ -73,58 +64,13 @@ public class ExtractResendAndRequeue imp
}
else
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (!message.isQueueDeleted())
- {
- if (_requeueIfUnableToResend)
- {
- _msgToRequeue.put(deliveryTag, message);
- }
- else
- {
-
- dequeueEntry(message);
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
- }
- else
- {
- dequeueEntry(message);
- _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
- }
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
// false means continue processing
return false;
}
-
- private void dequeueEntry(final QueueEntry node)
- {
- ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
- dequeueEntry(node, txn);
- }
-
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
- {
- txn.dequeue(node.getQueue(), node.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- node.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
public void visitComplete()
{
_unacknowledgedMessageMap.clear();
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Wed Feb 12 13:27:51 2014
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private Exchange _exchange;
+ private MessageDestination _messageDestination;
/**
* Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
return _messagePublishInfo.getExchange();
}
- public Exchange getExchange()
+ public MessageDestination getDestination()
{
- return _exchange;
+ return _messageDestination;
}
public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
return getContentHeader().getBodySize();
}
- public void setExchange(final Exchange e)
+ public void setMessageDestination(final MessageDestination e)
{
- _exchange = e;
+ _messageDestination = e;
}
public int getBodyCount() throws AMQException
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Wed Feb 12 13:27:51 2014
@@ -105,7 +105,7 @@ public class MessageMetaData implements
}
- public int writeToBuffer(int offset, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
int oldPosition = dest.position();
try
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -36,24 +37,24 @@ public interface UnacknowledgedMessageMa
*@param message the message being iterated over @return true to stop iteration, false to continue
* @throws AMQException
*/
- boolean callback(final long deliveryTag, QueueEntry message) throws AMQException;
+ boolean callback(final long deliveryTag, MessageInstance message) throws AMQException;
void visitComplete();
}
void visit(Visitor visitor) throws AMQException;
- void add(long deliveryTag, QueueEntry message);
+ void add(long deliveryTag, MessageInstance message);
- QueueEntry remove(long deliveryTag);
+ MessageInstance remove(long deliveryTag);
- Collection<QueueEntry> cancelAllMessages();
+ Collection<MessageInstance> cancelAllMessages();
int size();
void clear();
- QueueEntry get(long deliveryTag);
+ MessageInstance get(long deliveryTag);
/**
* Get the set of delivery tags that are outstanding.
@@ -62,7 +63,7 @@ public interface UnacknowledgedMessageMa
*/
Set<Long> getDeliveryTags();
- Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple);
+ Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Collection;
@@ -34,7 +35,7 @@ public class UnacknowledgedMessageMapImp
private long _unackedSize;
- private Map<Long, QueueEntry> _map;
+ private Map<Long, MessageInstance> _map;
private long _lastDeliveryTag;
@@ -43,10 +44,10 @@ public class UnacknowledgedMessageMapImp
public UnacknowledgedMessageMapImpl(int prefetchLimit)
{
_prefetchLimit = prefetchLimit;
- _map = new LinkedHashMap<Long, QueueEntry>(prefetchLimit);
+ _map = new LinkedHashMap<Long, MessageInstance>(prefetchLimit);
}
- public void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs)
+ public void collect(long deliveryTag, boolean multiple, Map<Long, MessageInstance> msgs)
{
if (multiple)
{
@@ -54,7 +55,7 @@ public class UnacknowledgedMessageMapImp
}
else
{
- final QueueEntry entry = get(deliveryTag);
+ final MessageInstance entry = get(deliveryTag);
if(entry != null)
{
msgs.put(deliveryTag, entry);
@@ -63,7 +64,7 @@ public class UnacknowledgedMessageMapImp
}
- public void remove(Map<Long,QueueEntry> msgs)
+ public void remove(Map<Long,MessageInstance> msgs)
{
synchronized (_lock)
{
@@ -74,12 +75,12 @@ public class UnacknowledgedMessageMapImp
}
}
- public QueueEntry remove(long deliveryTag)
+ public MessageInstance remove(long deliveryTag)
{
synchronized (_lock)
{
- QueueEntry message = _map.remove(deliveryTag);
+ MessageInstance message = _map.remove(deliveryTag);
if(message != null)
{
_unackedSize -= message.getMessage().getSize();
@@ -94,8 +95,8 @@ public class UnacknowledgedMessageMapImp
{
synchronized (_lock)
{
- Set<Map.Entry<Long, QueueEntry>> currentEntries = _map.entrySet();
- for (Map.Entry<Long, QueueEntry> entry : currentEntries)
+ Set<Map.Entry<Long, MessageInstance>> currentEntries = _map.entrySet();
+ for (Map.Entry<Long, MessageInstance> entry : currentEntries)
{
visitor.callback(entry.getKey().longValue(), entry.getValue());
}
@@ -103,7 +104,7 @@ public class UnacknowledgedMessageMapImp
}
}
- public void add(long deliveryTag, QueueEntry message)
+ public void add(long deliveryTag, MessageInstance message)
{
synchronized (_lock)
{
@@ -113,12 +114,12 @@ public class UnacknowledgedMessageMapImp
}
}
- public Collection<QueueEntry> cancelAllMessages()
+ public Collection<MessageInstance> cancelAllMessages()
{
synchronized (_lock)
{
- Collection<QueueEntry> currentEntries = _map.values();
- _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
+ Collection<MessageInstance> currentEntries = _map.values();
+ _map = new LinkedHashMap<Long, MessageInstance>(_prefetchLimit);
_unackedSize = 0l;
return currentEntries;
}
@@ -141,7 +142,7 @@ public class UnacknowledgedMessageMapImp
}
}
- public QueueEntry get(long key)
+ public MessageInstance get(long key)
{
synchronized (_lock)
{
@@ -157,19 +158,19 @@ public class UnacknowledgedMessageMapImp
}
}
- public Collection<QueueEntry> acknowledge(long deliveryTag, boolean multiple)
+ public Collection<MessageInstance> acknowledge(long deliveryTag, boolean multiple)
{
- Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ Map<Long, MessageInstance> ackedMessageMap = new LinkedHashMap<Long,MessageInstance>();
collect(deliveryTag, multiple, ackedMessageMap);
remove(ackedMessageMap);
return ackedMessageMap.values();
}
- private void collect(long key, Map<Long, QueueEntry> msgs)
+ private void collect(long key, Map<Long, MessageInstance> msgs)
{
synchronized (_lock)
{
- for (Map.Entry<Long, QueueEntry> entry : _map.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : _map.entrySet())
{
msgs.put(entry.getKey(),entry.getValue());
if (entry.getKey() == key)
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -73,7 +74,7 @@ public class BasicConsumeMethodHandler i
" args:" + body.getArguments());
}
- AMQQueue queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
+ MessageSource queue = body.getQueue() == null ? channel.getDefaultQueue() : vHost.getQueue(body.getQueue().intern().toString());
if (queue == null)
{
@@ -120,8 +121,12 @@ public class BasicConsumeMethodHandler i
if(consumerTagName == null || channel.getSubscription(consumerTagName) == null)
{
- AMQShortString consumerTag = channel.subscribeToQueue(consumerTagName, queue, !body.getNoAck(),
- body.getArguments(), body.getNoLocal(), body.getExclusive());
+ AMQShortString consumerTag = channel.consumeFromSource(consumerTagName,
+ queue,
+ !body.getNoAck(),
+ body.getArguments(),
+ body.getExclusive(),
+ body.getNoLocal());
if (!body.getNowait())
{
MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
@@ -156,14 +161,14 @@ public class BasicConsumeMethodHandler i
}
- catch (AMQQueue.ExistingExclusiveSubscription e)
+ catch (AMQQueue.ExistingExclusiveConsumer e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
+ " as it already has an existing exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive e)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getChannelException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -24,27 +24,31 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicGetBody;
import org.apache.qpid.framing.BasicGetEmptyBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.MessageOnlyCreditManager;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.protocol.v0_8.SubscriptionFactoryImpl;
+import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
+import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.EnumSet;
+
public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
{
private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
@@ -124,50 +128,79 @@ public class BasicGetMethodHandler imple
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
- final ClientDeliveryMethod getDeliveryMethod = new ClientDeliveryMethod()
- {
-
- @Override
- public void deliverToClient(final Subscription sub, final ServerMessage message, final
- InstanceProperties props, final long deliveryTag)
- throws AMQException
- {
- singleMessageCredit.useCreditForMessage(message.getSize());
- session.getProtocolOutputConverter().writeGetOk(message,
- props,
- channel.getChannelId(),
- deliveryTag,
- queue.getMessageCount());
-
-
- }
- };
+ final GetDeliveryMethod getDeliveryMethod =
+ new GetDeliveryMethod(singleMessageCredit, session, channel, queue);
final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
channel.addUnacknowledgedMessage(entry, deliveryTag, null);
}
};
- Subscription sub;
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES,
+ Consumer.Option.SEES_REQUEUES);
if(acks)
{
- sub = SubscriptionFactoryImpl.INSTANCE.createSubscription(channel, session, null, acks, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+
+ target = ConsumerTarget_0_8.createAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
else
{
- sub = SubscriptionFactoryImpl.INSTANCE.createBasicGetNoAckSubscription(channel, session, null, null, false, singleMessageCredit, getDeliveryMethod, getRecordMethod);
+ target = ConsumerTarget_0_8.createGetNoAckTarget(channel,
+ AMQShortString.EMPTY_STRING, null,
+ singleMessageCredit, getDeliveryMethod, getRecordMethod);
}
- queue.registerSubscription(sub,false);
- queue.flushSubscription(sub);
- queue.unregisterSubscription(sub);
- return(!singleMessageCredit.hasCredit());
+ Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options);
+ sub.flush();
+ sub.close();
+ return(getDeliveryMethod.hasDeliveredMessage());
}
+ private static class GetDeliveryMethod implements ClientDeliveryMethod
+ {
+
+ private final FlowCreditManager _singleMessageCredit;
+ private final AMQProtocolSession _session;
+ private final AMQChannel _channel;
+ private final AMQQueue _queue;
+ private boolean _deliveredMessage;
+
+ public GetDeliveryMethod(final FlowCreditManager singleMessageCredit,
+ final AMQProtocolSession session,
+ final AMQChannel channel, final AMQQueue queue)
+ {
+ _singleMessageCredit = singleMessageCredit;
+ _session = session;
+ _channel = channel;
+ _queue = queue;
+ }
+
+ @Override
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
+ final InstanceProperties props, final long deliveryTag) throws AMQException
+ {
+ _singleMessageCredit.useCreditForMessage(message.getSize());
+ _session.getProtocolOutputConverter().writeGetOk(message,
+ props,
+ _channel.getChannelId(),
+ deliveryTag,
+ _queue.getMessageCount());
+
+ _deliveredMessage = true;
+ }
+
+ public boolean hasDeliveredMessage()
+ {
+ return _deliveredMessage;
+ }
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -67,7 +68,7 @@ public class BasicPublishMethodHandler i
}
VirtualHost vHost = session.getVirtualHost();
- Exchange exch = vHost.getExchange(exchangeName.toString());
+ MessageDestination exch = vHost.getMessageDestination(exchangeName.toString());
// if the exchange does not exist we raise a channel exception
if (exch == null)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -56,7 +56,7 @@ public class BasicRecoverMethodHandler i
throw body.getChannelNotFoundException(channelId);
}
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -58,7 +58,7 @@ public class BasicRecoverSyncMethodHandl
throw body.getChannelNotFoundException(channelId);
}
channel.sync();
- channel.resend(body.getRequeue());
+ channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Wed Feb 12 13:27:51 2014
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
@@ -65,7 +66,7 @@ public class BasicRejectMethodHandler im
long deliveryTag = body.getDeliveryTag();
- QueueEntry message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
+ MessageInstance message = channel.getUnacknowledgedMessageMap().get(deliveryTag);
if (message == null)
{
@@ -73,16 +74,6 @@ public class BasicRejectMethodHandler im
}
else
{
- if (message.isQueueDeleted())
- {
- _logger.warn("Message's Queue has already been purged, dropping message");
- message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
- if(message != null)
- {
- message.delete();
- }
- return;
- }
if (message.getMessage() == null)
{
@@ -98,41 +89,43 @@ public class BasicRejectMethodHandler im
" on channel:" + channel.debugIdentity());
}
- message.reject();
-
if (body.getRequeue())
{
- channel.requeue(deliveryTag);
-
//this requeue represents a message rejected from the pre-dispatch queue
//therefore we need to amend the delivery counter.
message.decrementDeliveryCount();
+
+ channel.requeue(deliveryTag);
}
else
{
- final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
- _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
- if (maxDeliveryCountEnabled)
- {
- final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
- _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
- if (deliveredTooManyTimes)
- {
- channel.deadLetter(body.getDeliveryTag());
- }
- else
- {
- //this requeue represents a message rejected because of a recover/rollback that we
- //are not ready to DLQ. We rely on the reject command to resend from the unacked map
- //and therefore need to increment the delivery counter so we cancel out the effect
- //of the AMQChannel#resend() decrement.
- message.incrementDeliveryCount();
- }
- }
- else
- {
- channel.deadLetter(body.getDeliveryTag());
- }
+ // Since the Java client abuses the reject flag for requeing after rollback, we won't set reject here
+ // as it would prevent redelivery
+ // message.reject();
+
+ final boolean maxDeliveryCountEnabled = channel.isMaxDeliveryCountEnabled(deliveryTag);
+ _logger.debug("maxDeliveryCountEnabled: " + maxDeliveryCountEnabled + " deliveryTag " + deliveryTag);
+ if (maxDeliveryCountEnabled)
+ {
+ final boolean deliveredTooManyTimes = channel.isDeliveredTooManyTimes(deliveryTag);
+ _logger.debug("deliveredTooManyTimes: " + deliveredTooManyTimes + " deliveryTag " + deliveryTag);
+ if (deliveredTooManyTimes)
+ {
+ channel.deadLetter(body.getDeliveryTag());
+ }
+ else
+ {
+ //this requeue represents a message rejected because of a recover/rollback that we
+ //are not ready to DLQ. We rely on the reject command to resend from the unacked map
+ //and therefore need to increment the delivery counter so we cancel out the effect
+ //of the AMQChannel#resend() decrement.
+ message.incrementDeliveryCount();
+ }
+ }
+ else
+ {
+ channel.requeue(deliveryTag);
+ }
}
}
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java Wed Feb 12 13:27:51 2014
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -134,8 +135,8 @@ public class QueueDeclareHandler impleme
}
};
protocolConnection.addSessionCloseTask(sessionCloseTask);
- queue.addQueueDeleteTask(new AMQQueue.Task() {
- public void doTask(AMQQueue queue) throws AMQException
+ queue.addQueueDeleteTask(new Action<AMQQueue>() {
+ public void performAction(AMQQueue queue)
{
protocolConnection.removeSessionCloseTask(sessionCloseTask);
}
@@ -245,9 +246,9 @@ public class QueueDeclareHandler impleme
session.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue)
+ public void performAction(AMQQueue queue)
{
session.removeSessionCloseTask(deleteQueueTask);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java Wed Feb 12 13:27:51 2014
@@ -74,7 +74,7 @@ public class TxRollbackHandler implement
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
// Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(false);
+ channel.resend();
}
catch (AMQException e)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org