You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC
svn commit: r1565726 [2/6] - in /qpid/trunk/qpid/java: ./
amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri Feb 7 16:57:49 2014
@@ -26,11 +26,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import java.util.EnumMap;
import java.util.HashSet;
@@ -46,7 +50,7 @@ public abstract class QueueEntryImpl imp
private final QueueEntryList _queueEntryList;
- private MessageReference _message;
+ private final MessageReference _message;
private Set<Long> _rejectedBy = null;
@@ -59,7 +63,7 @@ public abstract class QueueEntryImpl imp
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener> _stateChangeListeners;
+ private volatile Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -134,7 +138,7 @@ public abstract class QueueEntryImpl imp
return _entryId;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queueEntryList.getQueue();
}
@@ -183,7 +187,7 @@ public abstract class QueueEntryImpl imp
public boolean acquire()
{
- return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
+ return acquire(NON_CONSUMER_ACQUIRED_STATE);
}
private boolean acquire(final EntryState state)
@@ -198,7 +202,7 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquire(Subscription sub)
+ public boolean acquire(QueueConsumer sub)
{
final boolean acquired = acquire(sub.getOwningState());
if(acquired)
@@ -208,17 +212,17 @@ public abstract class QueueEntryImpl imp
return acquired;
}
- public boolean acquiredBySubscription()
+ public boolean acquiredByConsumer()
{
- return (_state instanceof SubscriptionAcquiredState);
+ return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(Subscription subscription)
+ public boolean isAcquiredBy(QueueConsumer consumer)
{
EntryState state = _state;
- return state instanceof SubscriptionAcquiredState
- && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
+ return state instanceof ConsumerAcquiredState
+ && ((ConsumerAcquiredState)state).getConsumer() == consumer;
}
public void release()
@@ -228,14 +232,9 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
{
- if(state instanceof SubscriptionAcquiredState)
+ if(state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- Subscription subscription = ((SubscriptionAcquiredState)state).getSubscription();
- if (subscription != null)
- {
- subscription.releaseQueueEntry(this);
- }
}
if(!getQueue().isDeleted())
@@ -265,12 +264,12 @@ public abstract class QueueEntryImpl imp
return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
}
- public Subscription getDeliveredSubscription()
+ public QueueConsumer getDeliveredConsumer()
{
EntryState state = _state;
- if (state instanceof SubscriptionAcquiredState)
+ if (state instanceof ConsumerAcquiredState)
{
- return ((SubscriptionAcquiredState) state).getSubscription();
+ return (QueueConsumer) ((ConsumerAcquiredState) state).getConsumer();
}
else
{
@@ -280,16 +279,16 @@ public abstract class QueueEntryImpl imp
public void reject()
{
- Subscription subscription = getDeliveredSubscription();
+ QueueConsumer consumer = getDeliveredConsumer();
- if (subscription != null)
+ if (consumer != null)
{
if (_rejectedBy == null)
{
_rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(subscription.getSubscriptionID());
+ _rejectedBy.add(consumer.getId());
}
else
{
@@ -297,12 +296,12 @@ public abstract class QueueEntryImpl imp
}
}
- public boolean isRejectedBy(long subscriptionId)
+ public boolean isRejectedBy(QueueConsumer consumer)
{
- if (_rejectedBy != null) // We have subscriptions that rejected this message
+ if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(subscriptionId);
+ return _rejectedBy.contains(consumer.getId());
}
else // This message hasn't been rejected yet.
{
@@ -316,12 +315,10 @@ public abstract class QueueEntryImpl imp
if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, DEQUEUED_STATE))
{
- Subscription s = null;
- if (state instanceof SubscriptionAcquiredState)
+ Consumer s = null;
+ if (state instanceof ConsumerAcquiredState)
{
getQueue().decrementUnackedMsgCount(this);
- s = ((SubscriptionAcquiredState) state).getSubscription();
- s.onDequeue(this);
}
getQueue().dequeue(this,s);
@@ -336,7 +333,7 @@ public abstract class QueueEntryImpl imp
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener l : _stateChangeListeners)
+ for(StateChangeListener<MessageInstance<QueueConsumer>, State> l : _stateChangeListeners)
{
l.stateChanged(this, oldState, newState);
}
@@ -367,7 +364,7 @@ public abstract class QueueEntryImpl imp
dispose();
}
- public int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn)
+ public int routeToAlternate(final Action<MessageInstance<? extends Consumer>> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
Exchange alternateExchange = currentQueue.getAlternateExchange();
@@ -379,7 +376,10 @@ public abstract class QueueEntryImpl imp
txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
}
- int enqueues = alternateExchange.send(getMessage(), getInstanceProperties(), txn, action);
+ int enqueues = alternateExchange.send(getMessage(),
+ getInstanceProperties(),
+ txn,
+ action);
txn.dequeue(currentQueue, getMessage(), new ServerTransaction.Action()
{
@@ -412,21 +412,21 @@ public abstract class QueueEntryImpl imp
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener listener)
+ public void addStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<MessageInstance<QueueConsumer>, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener listener)
+ public boolean removeStateChangeListener(StateChangeListener<MessageInstance<QueueConsumer>, State> listener)
{
- Set<StateChangeListener> listeners = _stateChangeListeners;
+ Set<StateChangeListener<MessageInstance<QueueConsumer>, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -461,6 +461,12 @@ public abstract class QueueEntryImpl imp
return _deliveryCount;
}
+ @Override
+ public int getMaximumDeliveryCount()
+ {
+ return getQueue().getMaximumDeliveryCount();
+ }
+
public void incrementDeliveryCount()
{
_deliveryCountUpdater.incrementAndGet(this);
@@ -485,6 +491,23 @@ public abstract class QueueEntryImpl imp
'}';
}
+ @Override
+ public boolean resend() throws AMQException
+ {
+ QueueConsumer sub = getDeliveredConsumer();
+ if(sub != null)
+ {
+ return sub.resend(this);
+ }
+ return false;
+ }
+
+ @Override
+ public TransactionLogResource getOwningResource()
+ {
+ return getQueue();
+ }
+
private static class EntryInstanceProperties implements InstanceProperties
{
private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Fri Feb 7 16:57:49 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
public interface QueueEntryList<Q extends QueueEntry>
{
- AMQQueue getQueue();
+ AMQQueue<QueueConsumer> getQueue();
Q add(ServerMessage message);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Fri Feb 7 16:57:49 2014
@@ -32,8 +32,8 @@ import org.apache.qpid.transport.Transpo
/**
* QueueRunners are Runnables used to process a queue when requiring
- * asynchronous message delivery to subscriptions, which is necessary
- * when straight-through delivery of a message to a subscription isn't
+ * asynchronous message delivery to consumers, which is necessary
+ * when straight-through delivery of a message to a consumer isn't
* possible during the enqueue operation.
*/
public class QueueRunner implements Runnable
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri Feb 7 16:57:49 2014
@@ -18,15 +18,7 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -43,27 +35,33 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.QueueActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
+import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
-import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
+public class SimpleAMQQueue implements AMQQueue<QueueConsumer>,
+ StateChangeListener<QueueConsumer, QueueConsumer.State>,
+ MessageGroupManager.ConsumerResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
@@ -98,9 +96,9 @@ public class SimpleAMQQueue implements A
private final QueueEntryList<QueueEntry> _entries;
- private final SubscriptionList _subscriptionList = new SubscriptionList();
+ private final QueueConsumerList _consumerList = new QueueConsumerList();
- private volatile Subscription _exclusiveSubscriber;
+ private volatile QueueConsumer _exclusiveSubscriber;
@@ -120,13 +118,7 @@ public class SimpleAMQQueue implements A
private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
- private final AtomicInteger _consumerCountHigh = new AtomicInteger(0);
- private final AtomicLong _msgTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _byteTxnEnqueues = new AtomicLong(0);
- private final AtomicLong _msgTxnDequeues = new AtomicLong(0);
- private final AtomicLong _byteTxnDequeues = new AtomicLong(0);
private final AtomicLong _unackedMsgCount = new AtomicLong(0);
- private final AtomicLong _unackedMsgCountHigh = new AtomicLong(0);
private final AtomicLong _unackedMsgBytes = new AtomicLong();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
@@ -165,7 +157,7 @@ public class SimpleAMQQueue implements A
private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
private final AtomicBoolean _deleted = new AtomicBoolean(false);
- private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<AMQQueue>> _deleteTaskList = new CopyOnWriteArrayList<Action<AMQQueue>>();
private LogSubject _logSubject;
@@ -187,8 +179,8 @@ public class SimpleAMQQueue implements A
private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
- private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
- new ArrayList<SubscriptionRegistrationListener>();
+ private final Collection<ConsumerRegistrationListener> _consumerListeners =
+ new ArrayList<ConsumerRegistrationListener>();
private AMQQueue.NotificationListener _notificationListener;
private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length];
@@ -257,7 +249,7 @@ public class SimpleAMQQueue implements A
}
else
{
- _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(
+ _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(arguments.get(
Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS);
}
}
@@ -388,11 +380,17 @@ public class SimpleAMQQueue implements A
return _name;
}
- // ------ Manage Subscriptions
+ // ------ Manage Consumers
- public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive)
- throws AMQSecurityException, ExistingExclusiveSubscription, ExistingSubscriptionPreventsExclusive
+
+ @Override
+ public synchronized QueueConsumer addConsumer(final ConsumerTarget target,
+ final FilterManager filters,
+ final Class<? extends ServerMessage> messageClass,
+ final String consumerName,
+ EnumSet<Consumer.Option> optionSet) throws AMQException
{
+
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
@@ -400,58 +398,61 @@ public class SimpleAMQQueue implements A
}
- if (hasExclusiveSubscriber())
+ if (hasExclusiveConsumer())
{
- throw new ExistingExclusiveSubscription();
+ throw new ExistingExclusiveConsumer();
}
- if (exclusive && !subscription.isTransient())
+
+ boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE);
+ boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT);
+
+ if (exclusive && !isTransient && getConsumerCount() != 0)
{
- if (getConsumerCount() != 0)
- {
- throw new ExistingSubscriptionPreventsExclusive();
- }
- else
- {
- _exclusiveSubscriber = subscription;
- }
+ throw new ExistingConsumerPreventsExclusive();
+ }
+
+ QueueConsumer consumer = new QueueConsumer(filters, messageClass,
+ optionSet.contains(Consumer.Option.ACQUIRES),
+ optionSet.contains(Consumer.Option.SEES_REQUEUES),
+ consumerName, optionSet.contains(Consumer.Option.TRANSIENT), target);
+ target.consumerAdded(consumer);
+
+
+ if (exclusive && !isTransient)
+ {
+ _exclusiveSubscriber = consumer;
}
- if(subscription.isActive())
+ if(consumer.isActive())
{
_activeSubscriberCount.incrementAndGet();
}
- subscription.setStateListener(this);
- subscription.setQueueContext(new QueueContext(_entries.getHead()));
+
+ consumer.setStateListener(this);
+ consumer.setQueueContext(new QueueContext(_entries.getHead()));
if (!isDeleted())
{
- subscription.setQueue(this, exclusive);
+ consumer.setQueue(this, exclusive);
if(_nolocal)
{
- subscription.setNoLocal(_nolocal);
+ consumer.setNoLocal(_nolocal);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionRegistered(this, subscription);
+ listener.consumerAdded(this, consumer);
}
}
- _subscriptionList.add(subscription);
-
- //Increment consumerCountHigh if necessary. (un)registerSubscription are both
- //synchronized methods so we don't need additional synchronization here
- if(_consumerCountHigh.get() < getConsumerCount())
- {
- _consumerCountHigh.incrementAndGet();
- }
+ _consumerList.add(consumer);
if (isDeleted())
{
- subscription.queueDeleted(this);
+ consumer.queueDeleted();
}
}
else
@@ -459,42 +460,49 @@ public class SimpleAMQQueue implements A
// TODO
}
- deliverAsync(subscription);
+ deliverAsync(consumer);
+
+ return consumer;
}
- public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
+ synchronized void unregisterConsumer(final QueueConsumer consumer) throws AMQException
{
- if (subscription == null)
+ if (consumer == null)
{
- throw new NullPointerException("subscription argument is null");
+ throw new NullPointerException("consumer argument is null");
}
- boolean removed = _subscriptionList.remove(subscription);
+ boolean removed = _consumerList.remove(consumer);
if (removed)
{
- subscription.close();
+ consumer.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
- subscription.setQueueContext(null);
+ consumer.setQueueContext(null);
+
+ if(!isDeleted() && isExclusive() && getConsumerCount() == 0)
+ {
+ setAuthorizationHolder(null);
+ }
if(_messageGroupManager != null)
{
- resetSubPointersForGroups(subscription, true);
+ resetSubPointersForGroups(consumer, true);
}
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- for(SubscriptionRegistrationListener listener : _subscriptionListeners)
+ for(ConsumerRegistrationListener listener : _consumerListeners)
{
- listener.subscriptionUnregistered(this, subscription);
+ listener.consumerRemoved(this, consumer);
}
}
// auto-delete queues must be deleted if there are no remaining subscribers
- if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
+ if (_autoDelete && getDeleteOnNoConsumers() && !consumer.isTransient() && getConsumerCount() == 0 )
{
if (_logger.isInfoEnabled())
{
@@ -503,57 +511,57 @@ public class SimpleAMQQueue implements A
getVirtualHost().removeQueue(this);
- // we need to manually fire the event to the removed subscription (which was the last one left for this
- // queue. This is because the delete method uses the subscription set which has just been cleared
- subscription.queueDeleted(this);
+ // we need to manually fire the event to the removed consumer (which was the last one left for this
+ // queue. This is because the delete method uses the consumer set which has just been cleared
+ consumer.queueDeleted();
}
}
}
- public Collection<Subscription> getConsumers()
+ public Collection<QueueConsumer> getConsumers()
{
- List<Subscription> consumers = new ArrayList<Subscription>();
- SubscriptionList.SubscriptionNodeIterator iter = _subscriptionList.iterator();
+ List<QueueConsumer> consumers = new ArrayList<QueueConsumer>();
+ QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator();
while(iter.advance())
{
- consumers.add(iter.getNode().getSubscription());
+ consumers.add(iter.getNode().getConsumer());
}
return consumers;
}
- public void addSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.add(listener);
+ _consumerListeners.add(listener);
}
}
- public void removeSubscriptionRegistrationListener(final SubscriptionRegistrationListener listener)
+ public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener)
{
- synchronized (_subscriptionListeners)
+ synchronized (_consumerListeners)
{
- _subscriptionListeners.remove(listener);
+ _consumerListeners.remove(listener);
}
}
- public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer);
if(clearAssignments)
{
- _messageGroupManager.clearAssignments(subscription);
+ _messageGroupManager.clearAssignments(consumer);
}
if(entry != null)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -617,23 +625,9 @@ public class SimpleAMQQueue implements A
}
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message) throws AMQException
- {
- enqueue(message, null);
- }
-
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
- {
- enqueue(message, false, action);
- }
- public void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
-
- if(transactional)
- {
- incrementTxnEnqueueStats(message);
- }
incrementQueueCount();
incrementQueueSize(message);
@@ -641,35 +635,35 @@ public class SimpleAMQQueue implements A
QueueEntry entry;
- final Subscription exclusiveSub = _exclusiveSubscriber;
+ final QueueConsumer exclusiveSub = _exclusiveSubscriber;
entry = _entries.add(message);
if(action != null || (exclusiveSub == null && _queueRunner.isIdle()))
{
/*
- iterate over subscriptions and if any is at the end of the queue and can deliver this message, then deliver the message
+ iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message
*/
- SubscriptionList.SubscriptionNode node = _subscriptionList.getMarkedNode();
- SubscriptionList.SubscriptionNode nextNode = node.findNext();
+ QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode();
+ QueueConsumerList.ConsumerNode nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
while (nextNode != null)
{
- if (_subscriptionList.updateMarkedNode(node, nextNode))
+ if (_consumerList.updateMarkedNode(node, nextNode))
{
break;
}
else
{
- node = _subscriptionList.getMarkedNode();
+ node = _consumerList.getMarkedNode();
nextNode = node.findNext();
if (nextNode == null)
{
- nextNode = _subscriptionList.getHead().findNext();
+ nextNode = _consumerList.getHead().findNext();
}
}
}
@@ -683,13 +677,13 @@ public class SimpleAMQQueue implements A
if (nextNode == null)
{
loops--;
- nextNode = _subscriptionList.getHead();
+ nextNode = _consumerList.getHead();
}
else
{
- // if subscription at end, and active, offer
- Subscription sub = nextNode.getSubscription();
- deliverToSubscription(sub, entry);
+ // if consumer at end, and active, offer
+ QueueConsumer sub = nextNode.getConsumer();
+ deliverToConsumer(sub, entry);
}
nextNode = nextNode.findNext();
@@ -699,7 +693,7 @@ public class SimpleAMQQueue implements A
if (entry.isAvailable())
{
- checkSubscriptionsNotAheadOfDelivery(entry);
+ checkConsumersNotAheadOfDelivery(entry);
if (exclusiveSub != null)
{
@@ -715,12 +709,12 @@ public class SimpleAMQQueue implements A
if(action != null)
{
- action.onEnqueue(entry);
+ action.performAction(entry);
}
}
- private void deliverToSubscription(final Subscription sub, final QueueEntry entry)
+ private void deliverToConsumer(final QueueConsumer sub, final QueueEntry entry)
throws AMQException
{
@@ -729,14 +723,14 @@ public class SimpleAMQQueue implements A
try
{
if (!sub.isSuspended()
- && subscriptionReadyAndHasInterest(sub, entry)
+ && consumerReadyAndHasInterest(sub, entry)
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(entry);
}
else
@@ -752,7 +746,7 @@ public class SimpleAMQQueue implements A
}
}
- private boolean assign(final Subscription sub, final QueueEntry entry)
+ private boolean assign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null)
{
@@ -766,17 +760,17 @@ public class SimpleAMQQueue implements A
}
}
- private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ private boolean mightAssign(final QueueConsumer sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
{
return true;
}
- Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry);
return (assigned == null) || (assigned == sub);
}
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
// Simple Queues don't :-)
@@ -810,19 +804,7 @@ public class SimpleAMQQueue implements A
getAtomicQueueCount().incrementAndGet();
}
- private void incrementTxnEnqueueStats(final ServerMessage message)
- {
- _msgTxnEnqueues.incrementAndGet();
- _byteTxnEnqueues.addAndGet(message.getSize());
- }
-
- private void incrementTxnDequeueStats(QueueEntry entry)
- {
- _msgTxnDequeues.incrementAndGet();
- _byteTxnDequeues.addAndGet(entry.getSize());
- }
-
- private void deliverMessage(final Subscription sub, final QueueEntry entry, boolean batch)
+ private void deliverMessage(final QueueConsumer sub, final QueueEntry entry, boolean batch)
throws AMQException
{
setLastSeenEntry(sub, entry);
@@ -833,15 +815,15 @@ public class SimpleAMQQueue implements A
sub.send(entry, batch);
}
- private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
+ private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final QueueEntry entry) throws AMQException
{
return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
}
- private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+ private void setLastSeenEntry(final QueueConsumer sub, final QueueEntry entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if (subContext != null)
{
QueueEntry releasedEntry = subContext.getReleasedEntry();
@@ -854,10 +836,10 @@ public class SimpleAMQQueue implements A
}
}
- private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
+ private void updateSubRequeueEntry(final QueueConsumer sub, final QueueEntry entry)
{
- QueueContext subContext = (QueueContext) sub.getQueueContext();
+ QueueContext subContext = sub.getQueueContext();
if(subContext != null)
{
QueueEntry oldEntry;
@@ -874,11 +856,11 @@ public class SimpleAMQQueue implements A
public void requeue(QueueEntry entry)
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
while (subscriberIter.advance() && entry.isAvailable())
{
- Subscription sub = subscriberIter.getNode().getSubscription();
+ QueueConsumer sub = subscriberIter.getNode().getConsumer();
// we don't make browsers send the same stuff twice
if (sub.seesRequeues())
@@ -891,20 +873,15 @@ public class SimpleAMQQueue implements A
}
- public void dequeue(QueueEntry entry, Subscription sub)
+ public void dequeue(QueueEntry entry, Consumer sub)
{
decrementQueueCount();
decrementQueueSize(entry);
- if (entry.acquiredBySubscription())
+ if (entry.acquiredByConsumer())
{
_deliveredMessages.decrementAndGet();
}
- if(sub != null && sub.isSessionTransactional())
- {
- incrementTxnDequeueStats(entry);
- }
-
checkCapacity();
}
@@ -928,17 +905,17 @@ public class SimpleAMQQueue implements A
_dequeueCount.incrementAndGet();
}
- public boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException
+ public boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException
{
- /* TODO : This is wrong as the subscription may be suspended, we should instead change the state of the message
- entry to resend and move back the subscription pointer. */
+ /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
+ entry to resend and move back the consumer pointer. */
- subscription.getSendLock();
+ consumer.getSendLock();
try
{
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
- deliverMessage(subscription, entry, false);
+ deliverMessage((QueueConsumer) consumer, entry, false);
return true;
}
else
@@ -948,7 +925,7 @@ public class SimpleAMQQueue implements A
}
finally
{
- subscription.releaseSendLock();
+ consumer.releaseSendLock();
}
}
@@ -956,12 +933,7 @@ public class SimpleAMQQueue implements A
public int getConsumerCount()
{
- return _subscriptionList.size();
- }
-
- public int getConsumerCountHigh()
- {
- return _consumerCountHigh.get();
+ return _consumerList.size();
}
public int getActiveConsumerCount()
@@ -1039,16 +1011,16 @@ public class SimpleAMQQueue implements A
}
- public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
+ public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState)
{
- if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if (newState == Subscription.State.ACTIVE)
+ else if (newState == QueueConsumer.State.ACTIVE)
{
- if (oldState != Subscription.State.ACTIVE)
+ if (oldState != QueueConsumer.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -1072,12 +1044,12 @@ public class SimpleAMQQueue implements A
return _atomicQueueSize;
}
- public boolean hasExclusiveSubscriber()
+ public boolean hasExclusiveConsumer()
{
return _exclusiveSubscriber != null;
}
- private void setExclusiveSubscriber(Subscription exclusiveSubscriber)
+ private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber)
{
_exclusiveSubscriber = exclusiveSubscriber;
}
@@ -1093,9 +1065,9 @@ public class SimpleAMQQueue implements A
return _entries;
}
- protected SubscriptionList getSubscriptionList()
+ protected QueueConsumerList getConsumerList()
{
- return _subscriptionList;
+ return _consumerList;
}
@@ -1300,12 +1272,12 @@ public class SimpleAMQQueue implements A
});
}
- public void addQueueDeleteTask(final Task task)
+ public void addQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.add(task);
}
- public void removeQueueDeleteTask(final Task task)
+ public void removeQueueDeleteTask(final Action<AMQQueue> task)
{
_deleteTaskList.remove(task);
}
@@ -1322,19 +1294,21 @@ public class SimpleAMQQueue implements A
if (!_deleted.getAndSet(true))
{
- for (Binding b : _bindings)
+ final ArrayList<Binding> bindingCopy = new ArrayList<Binding>(_bindings);
+
+ for (Binding b : bindingCopy)
{
b.getExchange().removeBinding(b);
}
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription s = subscriptionIter.getNode().getSubscription();
+ QueueConsumer s = consumerNodeIterator.getNode().getConsumer();
if (s != null)
{
- s.queueDeleted(this);
+ s.queueDeleted();
}
}
@@ -1375,9 +1349,9 @@ public class SimpleAMQQueue implements A
}
- for (Task task : _deleteTaskList)
+ for (Action<AMQQueue> task : _deleteTaskList)
{
- task.doTask(this);
+ task.performAction(this);
}
_deleteTaskList.clear();
@@ -1461,7 +1435,7 @@ public class SimpleAMQQueue implements A
}
- public void deliverAsync(Subscription sub)
+ public void deliverAsync(QueueConsumer sub)
{
if(_exclusiveSubscriber == null)
{
@@ -1469,28 +1443,23 @@ public class SimpleAMQQueue implements A
}
else
{
- SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
- if(flusher == null)
- {
- flusher = new SubFlushRunner(sub);
- sub.set(SUB_FLUSH_RUNNER, flusher);
- }
+ SubFlushRunner flusher = sub.getRunner();
flusher.execute(_asyncDelivery);
}
}
- public void flushSubscription(Subscription sub) throws AMQException
+ void flushConsumer(QueueConsumer sub) throws AMQException
{
// Access control
if (!getVirtualHost().getSecurityManager().authoriseConsume(this))
{
throw new AMQSecurityException("Permission denied: " + getName());
}
- flushSubscription(sub, Long.MAX_VALUE);
+ flushConsumer(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
+ boolean flushConsumer(QueueConsumer sub, long iterations) throws AMQException
{
boolean atTail = false;
final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
@@ -1511,8 +1480,8 @@ public class SimpleAMQQueue implements A
sub.getSendLock();
}
- atTail = attemptDelivery(sub, true);
- if (atTail && getNextAvailableEntry(sub) == null)
+ atTail = attemptDelivery((QueueConsumer)sub, true);
+ if (atTail && getNextAvailableEntry((QueueConsumer)sub) == null)
{
queueEmpty = true;
}
@@ -1546,21 +1515,21 @@ public class SimpleAMQQueue implements A
}
- // if there's (potentially) more than one subscription the others will potentially not have been advanced to the
+ // if there's (potentially) more than one consumer the others will potentially not have been advanced to the
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if (!hasExclusiveSubscriber())
+ if (!hasExclusiveConsumer())
{
- advanceAllSubscriptions();
+ advanceAllConsumers();
}
return atTail;
}
/**
- * Attempt delivery for the given subscription.
+ * Attempt delivery for the given consumer.
*
- * Looks up the next node for the subscription and attempts to deliver it.
+ * Looks up the next node for the consumer and attempts to deliver it.
*
*
* @param sub
@@ -1568,7 +1537,7 @@ public class SimpleAMQQueue implements A
* @return true if we have completed all possible deliveries for this sub.
* @throws AMQException
*/
- private boolean attemptDelivery(Subscription sub, boolean batch) throws AMQException
+ private boolean attemptDelivery(QueueConsumer sub, boolean batch) throws AMQException
{
boolean atTail = false;
@@ -1587,7 +1556,7 @@ public class SimpleAMQQueue implements A
if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
+ // to acquire the entry for this consumer
sub.restoreCredit(node);
}
else
@@ -1598,7 +1567,7 @@ public class SimpleAMQQueue implements A
}
else // Not enough Credit for message and wouldSuspend
{
- //QPID-1187 - Treat the subscription as suspended for this message
+ //QPID-1187 - Treat the consumer as suspended for this message
// and wait for the message to be removed to continue delivery.
subActive = false;
node.addStateChangeListener(new QueueEntryListener(sub));
@@ -1611,13 +1580,13 @@ public class SimpleAMQQueue implements A
return atTail || !subActive;
}
- protected void advanceAllSubscriptions() throws AMQException
+ protected void advanceAllConsumers() throws AMQException
{
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while (subscriberIter.advance())
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
+ while (consumerNodeIterator.advance())
{
- SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
- Subscription sub = subNode.getSubscription();
+ QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode();
+ QueueConsumer sub = subNode.getConsumer();
if(sub.acquires())
{
getNextAvailableEntry(sub);
@@ -1629,10 +1598,10 @@ public class SimpleAMQQueue implements A
}
}
- private QueueEntry getNextAvailableEntry(final Subscription sub)
+ private QueueEntry getNextAvailableEntry(final QueueConsumer sub)
throws AMQException
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
QueueEntry lastSeen = context.getLastSeenEntry();
@@ -1670,9 +1639,9 @@ public class SimpleAMQQueue implements A
}
}
- public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ public boolean isEntryAheadOfConsumer(QueueEntry entry, QueueConsumer sub)
{
- QueueContext context = (QueueContext) sub.getQueueContext();
+ QueueContext context = sub.getQueueContext();
if(context != null)
{
QueueEntry releasedNode = context.getReleasedEntry();
@@ -1689,14 +1658,14 @@ public class SimpleAMQQueue implements A
*
* A queue Runner is started whenever a state change occurs, e.g when a new
* message arrives on the queue and cannot be immediately delivered to a
- * subscription (i.e. asynchronous delivery is required). Unless there are
- * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * consumer (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to consumers unsuspending) which are
* capable of accepting/delivering all messages then these messages would
* otherwise remain on the queue.
*
* processQueue should be running while there are messages on the queue AND
- * there are subscriptions that can deliver them. If there are no
- * subscriptions capable of delivering the remaining messages on the queue
+ * there are consumers that can deliver them. If there are no
+ * consumers capable of delivering the remaining messages on the queue
* then processQueue should stop to prevent spinning.
*
* Since processQueue is runs in a fixed size Executor, it should not run
@@ -1720,7 +1689,7 @@ public class SimpleAMQQueue implements A
boolean lastLoop = false;
int iterations = MAX_ASYNC_DELIVERIES;
- final int numSubs = _subscriptionList.size();
+ final int numSubs = _consumerList.size();
final int perSub = Math.max(iterations / Math.max(numSubs,1), 1);
@@ -1731,8 +1700,8 @@ public class SimpleAMQQueue implements A
// So whilst delivery/rejection is going on a processQueue thread will be running
while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete))
{
- // we want to have one extra loop after every subscription has reached the point where it cannot move
- // further, just in case the advance of one subscription in the last loop allows a different subscription to
+ // we want to have one extra loop after every consumer has reached the point where it cannot move
+ // further, just in case the advance of one consumer in the last loop allows a different consumer to
// move forward in the next iteration
if (previousStateChangeCount != stateChangeCount)
@@ -1744,14 +1713,14 @@ public class SimpleAMQQueue implements A
}
previousStateChangeCount = stateChangeCount;
- boolean allSubscriptionsDone = true;
- boolean subscriptionDone;
+ boolean allConsumersDone = true;
+ boolean consumerDone;
- SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
+ QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator();
//iterate over the subscribers and try to advance their pointer
- while (subscriptionIter.advance())
+ while (consumerNodeIterator.advance())
{
- Subscription sub = subscriptionIter.getNode().getSubscription();
+ QueueConsumer sub = consumerNodeIterator.getNode().getConsumer();
sub.getSendLock();
try
@@ -1759,8 +1728,8 @@ public class SimpleAMQQueue implements A
for(int i = 0 ; i < perSub; i++)
{
//attempt delivery. returns true if no further delivery currently possible to this sub
- subscriptionDone = attemptDelivery(sub, true);
- if (subscriptionDone)
+ consumerDone = attemptDelivery(sub, true);
+ if (consumerDone)
{
sub.flushBatched();
if (lastLoop && !sub.isSuspended())
@@ -1771,9 +1740,9 @@ public class SimpleAMQQueue implements A
}
else
{
- //this subscription can accept additional deliveries, so we must
+ //this consumer can accept additional deliveries, so we must
//keep going after this (if iteration slicing allows it)
- allSubscriptionsDone = false;
+ allConsumersDone = false;
lastLoop = false;
if(--iterations == 0)
{
@@ -1792,24 +1761,24 @@ public class SimpleAMQQueue implements A
}
}
- if(allSubscriptionsDone && lastLoop)
+ if(allConsumersDone && lastLoop)
{
//We have done an extra loop already and there are again
//again no further delivery attempts possible, only
//keep going if state change demands it.
deliveryIncomplete = false;
}
- else if(allSubscriptionsDone)
+ else if(allConsumersDone)
{
- //All subscriptions reported being done, but we have to do
+ //All consumers reported being done, but we have to do
//an extra loop if the iterations are not exhausted and
//there is still any work to be done
- deliveryIncomplete = _subscriptionList.size() != 0;
+ deliveryIncomplete = _consumerList.size() != 0;
lastLoop = true;
}
else
{
- //some subscriptions can still accept more messages,
+ //some consumers can still accept more messages,
//keep going if iteration count allows.
lastLoop = false;
deliveryIncomplete = true;
@@ -1984,12 +1953,12 @@ public class SimpleAMQQueue implements A
return _notificationChecks;
}
- private final class QueueEntryListener implements QueueEntry.StateChangeListener
+ private final class QueueEntryListener implements StateChangeListener<MessageInstance<QueueConsumer>, QueueEntry.State>
{
- private final Subscription _sub;
+ private final QueueConsumer _sub;
- public QueueEntryListener(final Subscription sub)
+ public QueueEntryListener(final QueueConsumer sub)
{
_sub = sub;
}
@@ -2005,7 +1974,7 @@ public class SimpleAMQQueue implements A
return System.identityHashCode(_sub);
}
- public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
+ public void stateChanged(MessageInstance entry, QueueEntry.State oldSate, QueueEntry.State newState)
{
entry.removeStateChangeListener(this);
deliverAsync(_sub);
@@ -2076,26 +2045,6 @@ public class SimpleAMQQueue implements A
return _dequeueSize.get();
}
- public long getByteTxnEnqueues()
- {
- return _byteTxnEnqueues.get();
- }
-
- public long getByteTxnDequeues()
- {
- return _byteTxnDequeues.get();
- }
-
- public long getMsgTxnEnqueues()
- {
- return _msgTxnEnqueues.get();
- }
-
- public long getMsgTxnDequeues()
- {
- return _msgTxnDequeues.get();
- }
-
public long getPersistentByteEnqueues()
{
return _persistentMessageEnqueueSize.get();
@@ -2123,11 +2072,6 @@ public class SimpleAMQQueue implements A
return getName();
}
- public long getUnackedMessageCountHigh()
- {
- return _unackedMsgCountHigh.get();
- }
-
public long getUnackedMessageCount()
{
return _unackedMsgCount.get();
@@ -2146,17 +2090,8 @@ public class SimpleAMQQueue implements A
private void incrementUnackedMsgCount(QueueEntry entry)
{
- long unackedMsgCount = _unackedMsgCount.incrementAndGet();
+ _unackedMsgCount.incrementAndGet();
_unackedMsgBytes.addAndGet(entry.getSize());
-
- long unackedMsgCountHigh;
- while(unackedMsgCount > (unackedMsgCountHigh = _unackedMsgCountHigh.get()))
- {
- if(_unackedMsgCountHigh.compareAndSet(unackedMsgCountHigh, unackedMsgCount))
- {
- break;
- }
- }
}
public LogActor getLogActor()
@@ -2224,4 +2159,39 @@ public class SimpleAMQQueue implements A
return (String) _arguments.get(Queue.DESCRIPTION);
}
+ public final int send(final ServerMessage message,
+ final InstanceProperties instanceProperties,
+ final ServerTransaction txn,
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
+ {
+ txn.enqueue(this,message, new ServerTransaction.Action()
+ {
+ MessageReference _reference = message.newReference();
+
+ public void postCommit()
+ {
+ try
+ {
+ SimpleAMQQueue.this.enqueue(message, postEnqueueAction);
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+ finally
+ {
+ _reference.release();
+ }
+ }
+
+ public void onRollback()
+ {
+ _reference.release();
+ }
+ });
+ return 1;
+
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Fri Feb 7 16:57:49 2014
@@ -39,7 +39,7 @@ public class SimpleQueueEntryList implem
(SimpleQueueEntryList.class, SimpleQueueEntryImpl.class, "_tail");
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
static final AtomicReferenceFieldUpdater<SimpleQueueEntryImpl, SimpleQueueEntryImpl>
_nextUpdater = SimpleQueueEntryImpl._nextUpdater;
@@ -49,7 +49,7 @@ public class SimpleQueueEntryList implem
private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new AtomicReference<SimpleQueueEntryImpl>();
- public SimpleQueueEntryList(AMQQueue queue)
+ public SimpleQueueEntryList(AMQQueue<QueueConsumer> queue)
{
_queue = queue;
_head = new SimpleQueueEntryImpl(this);
@@ -71,7 +71,7 @@ public class SimpleQueueEntryList implem
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Fri Feb 7 16:57:49 2014
@@ -20,7 +20,10 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -29,7 +32,7 @@ import java.util.UUID;
public class SortedQueue extends OutOfOrderQueue
{
//Lock object to synchronize enqueue. Used instead of the object
- //monitor to prevent lock order issues with subscription sendLocks
+ //monitor to prevent lock order issues with consumer sendLocks
//and consumer updates in the super classes
private final Object _sortedQueueLock = new Object();
private final String _sortedPropertyName;
@@ -48,7 +51,7 @@ public class SortedQueue extends OutOfOr
return _sortedPropertyName;
}
- public void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException
+ public void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException
{
synchronized (_sortedQueueLock)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Fri Feb 7 16:57:49 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.queue.Sort
* Uses the red/black tree algorithm specified in "Introduction to Algorithms".
* ISBN-10: 0262033844
* ISBN-13: 978-0262033848
- * @see http://en.wikipedia.org/wiki/Red-black_tree
+ * see http://en.wikipedia.org/wiki/Red-black_tree
*/
public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl>
{
@@ -36,17 +36,17 @@ public class SortedQueueEntryList implem
private SortedQueueEntryImpl _root;
private long _entryId = Long.MIN_VALUE;
private final Object _lock = new Object();
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
private final String _propertyName;
- public SortedQueueEntryList(final AMQQueue queue, final String propertyName)
+ public SortedQueueEntryList(final AMQQueue<QueueConsumer> queue, final String propertyName)
{
_queue = queue;
_head = new SortedQueueEntryImpl(this);
_propertyName = propertyName;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Fri Feb 7 16:57:49 2014
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.transport.TransportException;
import java.util.concurrent.Executor;
@@ -38,7 +37,7 @@ class SubFlushRunner implements Runnable
private static final Logger _logger = Logger.getLogger(SubFlushRunner.class);
- private final Subscription _sub;
+ private final QueueConsumer _sub;
private static int IDLE = 0;
private static int SCHEDULED = 1;
@@ -51,7 +50,7 @@ class SubFlushRunner implements Runnable
private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
private final AtomicBoolean _stateChange = new AtomicBoolean();
- public SubFlushRunner(Subscription sub)
+ public SubFlushRunner(QueueConsumer sub)
{
_sub = sub;
}
@@ -65,7 +64,7 @@ class SubFlushRunner implements Runnable
try
{
CurrentActor.set(_sub.getLogActor());
- complete = getQueue().flushSubscription(_sub, ITERATIONS);
+ complete = getQueue().flushConsumer(_sub, ITERATIONS);
}
catch (AMQException e)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Fri Feb 7 16:57:49 2014
@@ -1065,7 +1065,7 @@ abstract public class AbstractJDBCMessag
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
- stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1076,7 +1076,7 @@ abstract public class AbstractJDBCMessag
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
- stmt.setString(5, record.getQueue().getId().toString());
+ stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1199,7 +1199,7 @@ abstract public class AbstractJDBCMessag
buf.position(1);
buf = buf.slice();
- metaData.writeToBuffer(0, buf);
+ metaData.writeToBuffer(buf);
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
try
{
@@ -1371,7 +1371,7 @@ abstract public class AbstractJDBCMessag
}
@Override
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return this;
}
@@ -1401,10 +1401,22 @@ abstract public class AbstractJDBCMessag
}
@Override
+ public String getName()
+ {
+ return _queueId.toString();
+ }
+
+ @Override
public UUID getId()
{
return _queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
}
protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java:r1562456-1565710
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java Fri Feb 7 16:57:49 2014
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -46,7 +47,7 @@ public class DurableConfigurationStoreHe
Queue.EXCLUSIVE,
Queue.ALTERNATE_EXCHANGE));
- public static void updateQueue(DurableConfigurationStore store, AMQQueue queue) throws AMQStoreException
+ public static void updateQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue) throws AMQStoreException
{
Map<String, Object> attributesMap = new LinkedHashMap<String, Object>();
attributesMap.put(Queue.NAME, queue.getName());
@@ -71,7 +72,7 @@ public class DurableConfigurationStoreHe
store.update(queue.getId(), QUEUE, attributesMap);
}
- public static void createQueue(DurableConfigurationStore store, AMQQueue queue)
+ public static void createQueue(DurableConfigurationStore store, AMQQueue<? extends Consumer> queue)
throws AMQStoreException
{
Map<String, Object> attributesMap = new HashMap<String, Object>();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java Fri Feb 7 16:57:49 2014
@@ -29,7 +29,7 @@ public interface StorableMessageMetaData
int getStorableSize();
- int writeToBuffer(int offsetInMetaData, ByteBuffer dest);
+ int writeToBuffer(ByteBuffer dest);
int getContentSize();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java Fri Feb 7 16:57:49 2014
@@ -70,7 +70,7 @@ public interface Transaction
public static interface Record
{
- TransactionLogResource getQueue();
+ TransactionLogResource getResource();
EnqueueableMessage getMessage();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Fri Feb 7 16:57:49 2014
@@ -24,5 +24,7 @@ import java.util.UUID;
public interface TransactionLogResource
{
+ String getName();
public UUID getId();
+ boolean isDurable();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Feb 7 16:57:49 2014
@@ -25,12 +25,14 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -88,7 +90,7 @@ public class AsyncAutoCommitTransaction
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -158,15 +160,15 @@ public class AsyncAutoCommitTransaction
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -210,7 +212,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Fri Feb 7 16:57:49 2014
@@ -25,11 +25,13 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import java.util.Collection;
import java.util.List;
@@ -73,7 +75,7 @@ public class AutoCommitTransaction imple
immediateAction.postCommit();
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -105,15 +107,15 @@ public class AutoCommitTransaction imple
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -152,7 +154,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Fri Feb 7 16:57:49 2014
@@ -22,10 +22,12 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -74,7 +76,7 @@ public class DistributedTransaction impl
}
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
@@ -87,13 +89,13 @@ public class DistributedTransaction impl
}
}
- public void dequeue(Collection<QueueEntry> messages, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> messages, Action postTransactionAction)
{
if(_branch != null)
{
- for(QueueEntry entry : messages)
+ for(MessageInstance entry : messages)
{
- _branch.dequeue(entry.getQueue(), entry.getMessage());
+ _branch.dequeue(entry.getOwningResource(), entry.getMessage());
}
_branch.addPostTransactionAction(postTransactionAction);
}
@@ -103,7 +105,7 @@ public class DistributedTransaction impl
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
if(_branch != null)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Feb 7 16:57:49 2014
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -335,7 +336,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.enqueueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
@@ -344,7 +345,7 @@ public class DtxBranch
{
if(enqueue.isDurable())
{
- _transaction.dequeueMessage(enqueue.getQueue(), enqueue.getMessage());
+ _transaction.dequeueMessage(enqueue.getResource(), enqueue.getMessage());
}
}
}
@@ -356,31 +357,31 @@ public class DtxBranch
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message)
+ public void dequeue(TransactionLogResource resource, EnqueueableMessage message)
{
- _dequeueRecords.add(new Record(queue, message));
+ _dequeueRecords.add(new Record(resource, message));
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message)
{
_enqueueRecords.add(new Record(queue, message));
}
private static final class Record implements Transaction.Record
{
- private final BaseQueue _queue;
+ private final TransactionLogResource _resource;
private final EnqueueableMessage _message;
- public Record(BaseQueue queue, EnqueueableMessage message)
+ public Record(TransactionLogResource resource, EnqueueableMessage message)
{
- _queue = queue;
+ _resource = resource;
_message = message;
}
- public BaseQueue getQueue()
+ public TransactionLogResource getResource()
{
- return _queue;
+ return _resource;
}
public EnqueueableMessage getMessage()
@@ -390,7 +391,7 @@ public class DtxBranch
public boolean isDurable()
{
- return _message.isPersistent() && _queue.isDurable();
+ return _message.isPersistent() && _resource.isDurable();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Feb 7 16:57:49 2014
@@ -21,7 +21,9 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +93,7 @@ public class LocalTransaction implements
_postTransactionActions.add(postTransactionAction);
}
- public void dequeue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void dequeue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -118,7 +120,7 @@ public class LocalTransaction implements
}
}
- public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
+ public void dequeue(Collection<MessageInstance> queueEntries, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
@@ -126,10 +128,10 @@ public class LocalTransaction implements
try
{
- for(QueueEntry entry : queueEntries)
+ for(MessageInstance entry : queueEntries)
{
ServerMessage message = entry.getMessage();
- BaseQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if(message.isPersistent() && queue.isDurable())
{
@@ -195,7 +197,7 @@ public class LocalTransaction implements
}
}
- public void enqueue(BaseQueue queue, EnqueueableMessage message, Action postTransactionAction)
+ public void enqueue(TransactionLogResource queue, EnqueueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org