You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC
svn commit: r1565726 [4/6] - in /qpid/trunk/qpid/java: ./
amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Fri Feb 7 16:57:49 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
@@ -47,7 +48,7 @@ public class AutoCommitTransactionTest e
private MessageStore _transactionLog;
private AMQQueue _queue;
private List<AMQQueue> _queues;
- private Collection<QueueEntry> _queueEntries;
+ private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action;
private MockStoreTransaction _storeTransaction;
@@ -373,9 +374,9 @@ public class AutoCommitTransactionTest e
assertFalse("Rollback action must be fired", _action.isRollbackActionFired());
}
- private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+ private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
- Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Fri Feb 7 16:57:49 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
@@ -46,7 +47,7 @@ public class LocalTransactionTest extend
private AMQQueue _queue;
private List<AMQQueue> _queues;
- private Collection<QueueEntry> _queueEntries;
+ private Collection<MessageInstance> _queueEntries;
private ServerMessage _message;
private MockAction _action1;
private MockAction _action2;
@@ -597,9 +598,9 @@ public class LocalTransactionTest extend
assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
}
- private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
+ private Collection<MessageInstance> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
- Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
+ Collection<MessageInstance> queueEntries = new ArrayList<MessageInstance>();
assertTrue("Boolean arrays must be the same length", queueDurableFlags.length == messagePersistentFlags.length);
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Fri Feb 7 16:57:49 2014
@@ -27,6 +27,8 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
@@ -127,6 +129,12 @@ public class MockVirtualHost implements
}
@Override
+ public MessageSource getMessageSource(final String name)
+ {
+ return null;
+ }
+
+ @Override
public AMQQueue getQueue(UUID id)
{
return null;
@@ -174,6 +182,12 @@ public class MockVirtualHost implements
}
@Override
+ public MessageDestination getMessageDestination(final String name)
+ {
+ return null;
+ }
+
+ @Override
public Exchange getExchange(String name)
{
return null;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java Fri Feb 7 16:57:49 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
@@ -30,21 +30,20 @@ class ExplicitAcceptDispositionChangeLis
private static final Logger _logger = Logger.getLogger(ExplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private final Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private final ConsumerTarget_0_10 _target;
- public ExplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ExplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.getSessionModel().acknowledge(subscription, _entry);
+ _target.getSessionModel().acknowledge(_target, _entry);
}
else
{
@@ -55,10 +54,9 @@ class ExplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -68,10 +66,9 @@ class ExplicitAcceptDispositionChangeLis
public void onReject()
{
- final Subscription_0_10 subscription = getSubscription();
- if(subscription != null && _entry.isAcquiredBy(_sub))
+ if(_target != null && _entry.isAcquiredBy(_target.getConsumer()))
{
- subscription.reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -82,12 +79,8 @@ class ExplicitAcceptDispositionChangeLis
public boolean acquire()
{
- return _entry.acquire(getSubscription());
+ return _entry.acquire(_target.getConsumer());
}
- private Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java Fri Feb 7 16:57:49 2014
@@ -22,20 +22,20 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
{
private static final Logger _logger = Logger.getLogger(ImplicitAcceptDispositionChangeListener.class);
- private final QueueEntry _entry;
- private Subscription_0_10 _sub;
+ private final MessageInstance _entry;
+ private ConsumerTarget_0_10 _target;
- public ImplicitAcceptDispositionChangeListener(QueueEntry entry, Subscription_0_10 subscription_0_10)
+ public ImplicitAcceptDispositionChangeListener(MessageInstance entry, ConsumerTarget_0_10 target)
{
_entry = entry;
- _sub = subscription_0_10;
+ _target = target;
}
public void onAccept()
@@ -45,9 +45,9 @@ class ImplicitAcceptDispositionChangeLis
public void onRelease(boolean setRedelivered)
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().release(_entry, setRedelivered);
+ _target.release(_entry, setRedelivered);
}
else
{
@@ -57,9 +57,9 @@ class ImplicitAcceptDispositionChangeLis
public void onReject()
{
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_target.getConsumer()))
{
- getSubscription().reject(_entry);
+ _target.reject(_entry);
}
else
{
@@ -70,19 +70,15 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
- boolean acquired = _entry.acquire(getSubscription());
+ boolean acquired = _entry.acquire(_target.getConsumer());
if(acquired)
{
- getSubscription().recordUnacknowledged(_entry);
+ _target.recordUnacknowledged(_entry);
}
return acquired;
}
- public Subscription_0_10 getSubscription()
- {
- return _sub;
- }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java Fri Feb 7 16:57:49 2014
@@ -21,17 +21,17 @@
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.transport.Method;
public class MessageAcceptCompletionListener implements Method.CompletionListener
{
- private final Subscription_0_10 _sub;
- private final QueueEntry _entry;
+ private final ConsumerTarget_0_10 _sub;
+ private final MessageInstance _entry;
private final ServerSession _session;
private boolean _restoreCredit;
- public MessageAcceptCompletionListener(Subscription_0_10 sub, ServerSession session, QueueEntry entry, boolean restoreCredit)
+ public MessageAcceptCompletionListener(ConsumerTarget_0_10 sub, ServerSession session, MessageInstance entry, boolean restoreCredit)
{
super();
_sub = sub;
@@ -44,9 +44,9 @@ public class MessageAcceptCompletionList
{
if(_restoreCredit)
{
- _sub.restoreCredit(_entry);
+ _sub.restoreCredit(_entry.getMessage());
}
- if(_entry.isAcquiredBy(_sub))
+ if(_entry.isAcquiredBy(_sub.getConsumer()))
{
_session.acknowledge(_sub, _entry);
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Fri Feb 7 16:57:49 2014
@@ -141,7 +141,7 @@ public class MessageMetaData_0_10 implem
return buf;
}
- public int writeToBuffer(int offsetInMetaData, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
ByteBuffer buf = _encoded;
@@ -153,7 +153,7 @@ public class MessageMetaData_0_10 implem
buf = buf.duplicate();
- buf.position(offsetInMetaData);
+ buf.position(0);
if(dest.remaining() < buf.limit())
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java Fri Feb 7 16:57:49 2014
@@ -282,8 +282,8 @@ public class ServerConnectionDelegate ex
private void stopAllSubscriptions(Connection conn, SessionDetach dtc)
{
final ServerSession ssn = (ServerSession) conn.getSession(dtc.getChannel());
- final Collection<Subscription_0_10> subs = ssn.getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subs)
+ final Collection<ConsumerTarget_0_10> subs = ssn.getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subs)
{
subscription_0_10.stop();
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java Fri Feb 7 16:57:49 2014
@@ -46,7 +46,7 @@ import org.apache.qpid.AMQStoreException
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,15 +55,16 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.DistributedTransaction;
@@ -77,6 +78,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
@@ -104,14 +106,7 @@ public class ServerSession extends Sessi
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
- private final BaseQueue.PostEnqueueAction _checkCapacityAction = new BaseQueue.PostEnqueueAction()
- {
- @Override
- public void onEnqueue(final QueueEntry entry)
- {
- entry.getQueue().checkCapacity(ServerSession.this);
- }
- };
+ private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
public static interface MessageDispositionChangeListener
{
@@ -126,12 +121,6 @@ public class ServerSession extends Sessi
}
- public static interface Task
- {
- public void doTask(ServerSession session);
- }
-
-
private final SortedMap<Integer, MessageDispositionChangeListener> _messageDispositionListenerMap =
new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
@@ -142,9 +131,9 @@ public class ServerSession extends Sessi
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
+ private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>();
- private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<ServerSession>> _taskList = new CopyOnWriteArrayList<Action<ServerSession>>();
private final TransactionTimeoutHelper _transactionTimeoutHelper;
@@ -194,7 +183,7 @@ public class ServerSession extends Sessi
public int enqueue(final MessageTransferMessage message,
final InstanceProperties instanceProperties,
- final Exchange exchange)
+ final MessageDestination exchange)
{
if(_outstandingCredit.get() != UNLIMITED_CREDIT
&& _outstandingCredit.decrementAndGet() == (Integer.MAX_VALUE - PRODUCER_CREDIT_TOPUP_THRESHOLD))
@@ -386,9 +375,9 @@ public class ServerSession extends Sessi
}
_messageDispositionListenerMap.clear();
- for (Task task : _taskList)
+ for (Action<ServerSession> task : _taskList)
{
- task.doTask(this);
+ task.performAction(this);
}
LogMessage operationalLoggingMessage = _forcedCloseLogMessage.get();
@@ -405,9 +394,9 @@ public class ServerSession extends Sessi
// Broker shouldn't block awaiting close - thus do override this method to do nothing
}
- public void acknowledge(final Subscription_0_10 sub, final QueueEntry entry)
+ public void acknowledge(final ConsumerTarget_0_10 sub, final MessageInstance entry)
{
- _transaction.dequeue(entry.getQueue(), entry.getMessage(),
+ _transaction.dequeue(entry.getOwningResource(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -426,42 +415,26 @@ public class ServerSession extends Sessi
});
}
- public Collection<Subscription_0_10> getSubscriptions()
+ public Collection<ConsumerTarget_0_10> getSubscriptions()
{
return _subscriptions.values();
}
- public void register(String destination, Subscription_0_10 sub)
+ public void register(String destination, ConsumerTarget_0_10 sub)
{
_subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub);
}
- public Subscription_0_10 getSubscription(String destination)
+ public ConsumerTarget_0_10 getSubscription(String destination)
{
return _subscriptions.get(destination == null ? NULL_DESTINATION : destination);
}
- public void unregister(Subscription_0_10 sub)
+ public void unregister(ConsumerTarget_0_10 sub)
{
_subscriptions.remove(sub.getName());
- try
- {
- sub.getSendLock();
- AMQQueue queue = sub.getQueue();
- if(queue != null)
- {
- queue.unregisterSubscription(sub);
- }
- }
- catch (AMQException e)
- {
- // TODO
- _logger.error("Failed to unregister subscription :" + e.getMessage(), e);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
+
}
public boolean isTransactional()
@@ -638,12 +611,12 @@ public class ServerSession extends Sessi
return getConnection().getAuthorizedSubject();
}
- public void addSessionCloseTask(Task task)
+ public void addSessionCloseTask(Action<ServerSession> task)
{
_taskList.add(task);
}
- public void removeSessionCloseTask(Task task)
+ public void removeSessionCloseTask(Action<ServerSession> task)
{
_taskList.remove(task);
}
@@ -829,8 +802,8 @@ public class ServerSession extends Sessi
void unregisterSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
unregister(subscription_0_10);
}
@@ -838,8 +811,8 @@ public class ServerSession extends Sessi
void stopSubscriptions()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.stop();
}
@@ -848,8 +821,8 @@ public class ServerSession extends Sessi
public void receivedComplete()
{
- final Collection<Subscription_0_10> subscriptions = getSubscriptions();
- for (Subscription_0_10 subscription_0_10 : subscriptions)
+ final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions();
+ for (ConsumerTarget_0_10 subscription_0_10 : subscriptions)
{
subscription_0_10.flushCreditState(false);
}
@@ -955,4 +928,16 @@ public class ServerSession extends Sessi
return getId().compareTo(o.getId());
}
+ private class CheckCapacityAction<C extends Consumer> implements Action<MessageInstance<C>>
+ {
+ @Override
+ public void performAction(final MessageInstance<C> entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(ServerSession.this);
+ }
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java Fri Feb 7 16:57:49 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
@@ -34,7 +35,9 @@ import org.apache.qpid.server.filter.Fil
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
@@ -45,6 +48,7 @@ import org.apache.qpid.server.store.Dura
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.DtxNotSelectedException;
import org.apache.qpid.server.txn.IncorrectDtxStateException;
@@ -55,6 +59,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@@ -193,7 +198,7 @@ public class ServerSessionDelegate exten
String queueName = method.getQueue();
VirtualHost vhost = getVirtualHost(session);
- final AMQQueue queue = vhost.getQueue(queueName);
+ final MessageSource queue = vhost.getMessageSource(queueName);
if(queue == null)
{
@@ -214,9 +219,9 @@ public class ServerSessionDelegate exten
ServerSession s = (ServerSession) session;
queue.setExclusiveOwningSession(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getExclusiveOwningSession() == session)
{
@@ -228,9 +233,9 @@ public class ServerSessionDelegate exten
if(queue.getAuthorizationHolder() == null)
{
queue.setAuthorizationHolder(s);
- ((ServerSession) session).addSessionCloseTask(new ServerSession.Task()
+ ((ServerSession) session).addSessionCloseTask(new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
if(queue.getAuthorizationHolder() == session)
{
@@ -254,25 +259,42 @@ public class ServerSessionDelegate exten
return;
}
- Subscription_0_10 sub = new Subscription_0_10((ServerSession)session,
- destination,
- method.getAcceptMode(),
- method.getAcquireMode(),
- MessageFlowMode.WINDOW,
- creditManager,
- filterManager,
- method.getArguments());
+ ConsumerTarget_0_10 target = new ConsumerTarget_0_10((ServerSession)session, destination,
+ method.getAcceptMode(),
+ method.getAcquireMode(),
+ MessageFlowMode.WINDOW,
+ creditManager,
+ method.getArguments()
+ );
- ((ServerSession)session).register(destination, sub);
+ ((ServerSession)session).register(destination, target);
try
{
- queue.registerSubscription(sub, method.getExclusive());
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+ if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ options.add(Consumer.Option.ACQUIRES);
+ }
+ if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ if(method.getExclusive())
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
+ Consumer sub =
+ queue.addConsumer(target,
+ filterManager,
+ MessageTransferMessage.class,
+ destination,
+ options);
}
- catch (AMQQueue.ExistingExclusiveSubscription existing)
+ catch (AMQQueue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
- catch (AMQQueue.ExistingSubscriptionPreventsExclusive exclusive)
+ catch (AMQQueue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
@@ -288,7 +310,7 @@ public class ServerSessionDelegate exten
@Override
public void messageTransfer(Session ssn, final MessageTransfer xfr)
{
- final Exchange exchange = getExchangeForMessage(ssn, xfr);
+ final MessageDestination exchange = getDestinationForMessage(ssn, xfr);
final DeliveryProperties delvProps = xfr.getHeader() == null ? null : xfr.getHeader().getDeliveryProperties();
if(delvProps != null && delvProps.hasTtl() && !delvProps.hasExpiration())
@@ -307,7 +329,6 @@ public class ServerSessionDelegate exten
return;
}
- final Exchange exchangeInUse;
final MessageStore store = getVirtualHost(ssn).getMessageStore();
final StoredMessage<MessageMetaData_0_10> storeMessage = createStoreMessage(xfr, messageMetaData, store);
final ServerSession serverSession = (ServerSession) ssn;
@@ -385,7 +406,7 @@ public class ServerSessionDelegate exten
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -393,12 +414,7 @@ public class ServerSessionDelegate exten
}
else
{
- AMQQueue queue = sub.getQueue();
((ServerSession)session).unregister(sub);
- if(!queue.isDeleted() && queue.isExclusive() && queue.getConsumerCount() == 0)
- {
- queue.setAuthorizationHolder(null);
- }
}
}
@@ -407,7 +423,7 @@ public class ServerSessionDelegate exten
{
String destination = method.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -814,24 +830,24 @@ public class ServerSessionDelegate exten
return getVirtualHost(session).getExchange(exchangeName);
}
- private Exchange getExchangeForMessage(Session ssn, MessageTransfer xfr)
+ private MessageDestination getDestinationForMessage(Session ssn, MessageTransfer xfr)
{
VirtualHost virtualHost = getVirtualHost(ssn);
- Exchange exchange;
+ MessageDestination destination;
if(xfr.hasDestination())
{
- exchange = virtualHost.getExchange(xfr.getDestination());
- if(exchange == null)
+ destination = virtualHost.getMessageDestination(xfr.getDestination());
+ if(destination == null)
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
}
else
{
- exchange = virtualHost.getDefaultExchange();
+ destination = virtualHost.getDefaultExchange();
}
- return exchange;
+ return destination;
}
private VirtualHost getVirtualHost(Session session)
@@ -1249,9 +1265,9 @@ public class ServerSessionDelegate exten
if (autoDelete && exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task deleteQueueTask = new ServerSession.Task()
+ final Action<ServerSession> deleteQueueTask = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
try
{
@@ -1265,9 +1281,9 @@ public class ServerSessionDelegate exten
};
final ServerSession s = (ServerSession) session;
s.addSessionCloseTask(deleteQueueTask);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(deleteQueueTask);
}
@@ -1276,9 +1292,9 @@ public class ServerSessionDelegate exten
if (exclusive)
{
final AMQQueue q = queue;
- final ServerSession.Task removeExclusive = new ServerSession.Task()
+ final Action<ServerSession> removeExclusive = new Action<ServerSession>()
{
- public void doTask(ServerSession session)
+ public void performAction(ServerSession session)
{
q.setAuthorizationHolder(null);
q.setExclusiveOwningSession(null);
@@ -1287,9 +1303,9 @@ public class ServerSessionDelegate exten
final ServerSession s = (ServerSession) session;
q.setExclusiveOwningSession(s);
s.addSessionCloseTask(removeExclusive);
- queue.addQueueDeleteTask(new AMQQueue.Task()
+ queue.addQueueDeleteTask(new Action<AMQQueue>()
{
- public void doTask(AMQQueue queue) throws AMQException
+ public void performAction(AMQQueue queue)
{
s.removeSessionCloseTask(removeExclusive);
}
@@ -1461,7 +1477,7 @@ public class ServerSessionDelegate exten
{
String destination = sfm.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1478,7 +1494,7 @@ public class ServerSessionDelegate exten
{
String destination = stop.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
@@ -1496,7 +1512,7 @@ public class ServerSessionDelegate exten
{
String destination = flow.getDestination();
- Subscription_0_10 sub = ((ServerSession)session).getSubscription(destination);
+ ConsumerTarget_0_10 sub = ((ServerSession)session).getSubscription(destination);
if(sub == null)
{
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Fri Feb 7 16:57:49 2014
@@ -21,19 +21,7 @@
package org.apache.qpid.server.protocol.v0_8;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -42,6 +30,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -55,6 +44,7 @@ import org.apache.qpid.server.Transactio
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -66,25 +56,28 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.RecordDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -122,7 +115,7 @@ public class AMQChannel implements AMQSe
private IncomingMessage _currentMessage;
/** Maps from consumer tag to subscription instance. Allows us to unsubscribe from a queue. */
- private final Map<AMQShortString, Subscription> _tag2SubscriptionMap = new HashMap<AMQShortString, Subscription>();
+ private final Map<AMQShortString, ConsumerTarget_0_8> _tag2SubscriptionTargetMap = new HashMap<AMQShortString, ConsumerTarget_0_8>();
private final MessageStore _messageStore;
@@ -155,7 +148,7 @@ public class AMQChannel implements AMQSe
private volatile boolean _rollingBack;
private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
- private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
+ private List<MessageInstance> _resendList = new ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
private long _createTime = System.currentTimeMillis();
@@ -266,7 +259,7 @@ public class AMQChannel implements AMQSe
return _channelId;
}
- public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
+ public void setPublishFrame(MessagePublishInfo info, final MessageDestination e) throws AMQSecurityException
{
String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
SecurityManager securityManager = getVirtualHost().getSecurityManager();
@@ -275,7 +268,7 @@ public class AMQChannel implements AMQSe
throw new AMQSecurityException("Permission denied: " + e.getName());
}
_currentMessage = new IncomingMessage(info);
- _currentMessage.setExchange(e);
+ _currentMessage.setMessageDestination(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
@@ -360,7 +353,7 @@ public class AMQChannel implements AMQSe
}
};
- int enqueues = _currentMessage.getExchange().send(amqMessage, instanceProperties, _transaction,
+ int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction,
immediate ? _immediateAction : _capacityCheckAction);
if(enqueues == 0)
{
@@ -497,62 +490,89 @@ public class AMQChannel implements AMQSe
}
- public Subscription getSubscription(AMQShortString subscription)
+ public Consumer getSubscription(AMQShortString tag)
{
- return _tag2SubscriptionMap.get(subscription);
+ final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag);
+ return target == null ? null : target.getConsumer();
}
/**
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
*
+ *
* @param tag the tag chosen by the client (if null, server will generate one)
- * @param queue the queue to subscribe to
+ * @param source the queue to subscribe to
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
* @throws AMQException if something goes wrong
*/
- public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
+ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
+ FieldTable filters, boolean exclusive) throws AMQException
{
if (tag == null)
{
tag = new AMQShortString("sgen_" + getNextConsumerTag());
}
- if (_tag2SubscriptionMap.containsKey(tag))
+ if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
- Subscription subscription =
- SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
+ ConsumerTarget_0_8 target;
+ EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class);
+
+ if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue())))
+ {
+ target = ConsumerTarget_0_8.createBrowserTarget(this, tag, filters, _creditManager);
+ }
+ else if(acks)
+ {
+ target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ else
+ {
+ target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager);
+ options.add(Consumer.Option.ACQUIRES);
+ options.add(Consumer.Option.SEES_REQUEUES);
+ }
+ if(exclusive)
+ {
+ options.add(Consumer.Option.EXCLUSIVE);
+ }
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
- _tag2SubscriptionMap.put(tag, subscription);
+ _tag2SubscriptionTargetMap.put(tag, target);
try
{
- queue.registerSubscription(subscription, exclusive);
+ Consumer sub =
+ source.addConsumer(target,
+ FilterManagerFactory.createManager(FieldTable.convertToMap(filters)),
+ AMQMessage.class,
+ AMQShortString.toString(tag),
+ options);
}
catch (AMQException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
catch (RuntimeException e)
{
- _tag2SubscriptionMap.remove(tag);
+ _tag2SubscriptionTargetMap.remove(tag);
throw e;
}
return tag;
@@ -567,18 +587,11 @@ public class AMQChannel implements AMQSe
public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
{
- Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
+ ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
+ Consumer sub = target == null ? null : target.getConsumer();
if (sub != null)
{
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+ sub.close();
return true;
}
else
@@ -633,7 +646,7 @@ public class AMQChannel implements AMQSe
{
if (_logger.isInfoEnabled())
{
- if (!_tag2SubscriptionMap.isEmpty())
+ if (!_tag2SubscriptionTargetMap.isEmpty())
{
_logger.info("Unsubscribing all consumers on channel " + toString());
}
@@ -643,28 +656,21 @@ public class AMQChannel implements AMQSe
}
}
- for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
+ for (Map.Entry<AMQShortString, ConsumerTarget_0_8> me : _tag2SubscriptionTargetMap.entrySet())
{
if (_logger.isInfoEnabled())
{
_logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
- Subscription sub = me.getValue();
+ Consumer sub = me.getValue().getConsumer();
- try
- {
- sub.getSendLock();
- sub.getQueue().unregisterSubscription(sub);
- }
- finally
- {
- sub.releaseSendLock();
- }
+
+ sub.close();
}
- _tag2SubscriptionMap.clear();
+ _tag2SubscriptionTargetMap.clear();
}
/**
@@ -673,24 +679,15 @@ public class AMQChannel implements AMQSe
* @param entry the record of the message on the queue that was delivered
* @param deliveryTag the delivery tag used when delivering the message (see protocol spec for description of the
* delivery tag)
- * @param subscription The consumer that is to acknowledge this message.
+ * @param consumer The consumer that is to acknowledge this message.
*/
- public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
+ public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer)
{
if (_logger.isDebugEnabled())
{
- if (entry.getQueue() == null)
- {
- _logger.debug("Adding unacked message with a null queue:" + entry);
- }
- else
- {
- if (_logger.isDebugEnabled())
- {
_logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
- + ") with a queue(" + entry.getQueue() + ") for " + subscription);
- }
- }
+ + ") for " + consumer + " on " + entry.getOwningResource().getName());
+
}
_unacknowledgedMessageMap.add(deliveryTag, entry);
@@ -713,7 +710,7 @@ public class AMQChannel implements AMQSe
public void requeue() throws AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
- Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
+ Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
if (!messagesToBeDelivered.isEmpty())
{
@@ -724,21 +721,13 @@ public class AMQChannel implements AMQSe
}
- for (QueueEntry unacked : messagesToBeDelivered)
+ for (MessageInstance unacked : messagesToBeDelivered)
{
- if (!unacked.isQueueDeleted())
- {
- // Mark message redelivered
- unacked.setRedelivered();
-
- // Ensure message is released for redelivery
- unacked.release();
+ // Mark message redelivered
+ unacked.setRedelivered();
- }
- else
- {
- unacked.delete();
- }
+ // Ensure message is released for redelivery
+ unacked.release();
}
}
@@ -752,7 +741,7 @@ public class AMQChannel implements AMQSe
*/
public void requeue(long deliveryTag) throws AMQException
{
- QueueEntry unacked = _unacknowledgedMessageMap.remove(deliveryTag);
+ MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
if (unacked != null)
{
@@ -760,20 +749,8 @@ public class AMQChannel implements AMQSe
unacked.setRedelivered();
// Ensure message is released for redelivery
- if (!unacked.isQueueDeleted())
- {
-
- // Ensure message is released for redelivery
- unacked.release();
-
- }
- else
- {
- _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
- + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
+ unacked.release();
- unacked.delete();
- }
}
else
{
@@ -786,10 +763,10 @@ public class AMQChannel implements AMQSe
public boolean isMaxDeliveryCountEnabled(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
return maximumDeliveryCount > 0;
}
@@ -798,10 +775,10 @@ public class AMQChannel implements AMQSe
public boolean isDeliveredTooManyTimes(final long deliveryTag)
{
- final QueueEntry queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
+ final MessageInstance queueEntry = _unacknowledgedMessageMap.get(deliveryTag);
if (queueEntry != null)
{
- final int maximumDeliveryCount = queueEntry.getQueue().getMaximumDeliveryCount();
+ final int maximumDeliveryCount = queueEntry.getMaximumDeliveryCount();
final int numDeliveries = queueEntry.getDeliveryCount();
return maximumDeliveryCount != 0 && numDeliveries >= maximumDeliveryCount;
}
@@ -812,16 +789,14 @@ public class AMQChannel implements AMQSe
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @param requeue Are the messages to be requeued or dropped.
- *
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue) throws AMQException
+ public void resend() throws AMQException
{
- final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
- final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
+ final Map<Long, MessageInstance> msgToRequeue = new LinkedHashMap<Long, MessageInstance>();
+ final Map<Long, MessageInstance> msgToResend = new LinkedHashMap<Long, MessageInstance>();
if (_logger.isDebugEnabled())
{
@@ -833,9 +808,8 @@ public class AMQChannel implements AMQSe
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
msgToRequeue,
- msgToResend,
- requeue,
- _messageStore));
+ msgToResend
+ ));
// Process Messages to Resend
@@ -851,39 +825,20 @@ public class AMQChannel implements AMQSe
}
}
- for (Map.Entry<Long, QueueEntry> entry : msgToResend.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToResend.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
message.decrementDeliveryCount();
- AMQQueue queue = message.getQueue();
-
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
message.setRedelivered();
- Subscription sub = message.getDeliveredSubscription();
-
- if (sub != null)
- {
-
- if(!queue.resend(message,sub))
- {
- msgToRequeue.put(deliveryTag, message);
- }
- }
- else
+ if (!message.resend())
{
-
- if (_logger.isInfoEnabled())
- {
- _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
- + ")to prevent loss");
- }
- // move this message to requeue
msgToRequeue.put(deliveryTag, message);
}
} // for all messages
@@ -898,9 +853,9 @@ public class AMQChannel implements AMQSe
}
// Process Messages to Requeue at the front of the queue
- for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
+ for (Map.Entry<Long, MessageInstance> entry : msgToRequeue.entrySet())
{
- QueueEntry message = entry.getValue();
+ MessageInstance message = entry.getValue();
long deliveryTag = entry.getKey();
//Amend the delivery counter as the client hasn't seen these messages yet.
@@ -926,11 +881,11 @@ public class AMQChannel implements AMQSe
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+ Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
}
- private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+ private Collection<MessageInstance> getAckedMessages(long deliveryTag, boolean multiple)
{
return _unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
@@ -976,9 +931,9 @@ public class AMQChannel implements AMQSe
if (wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
- s.getQueue().deliverAsync(s);
+ s.getConsumer().externalStateChange();
}
}
@@ -992,15 +947,15 @@ public class AMQChannel implements AMQSe
if (!wasSuspended)
{
// may need to deliver queued messages
- for (Subscription s : _tag2SubscriptionMap.values())
+ for (ConsumerTarget_0_8 s : _tag2SubscriptionTargetMap.values())
{
try
{
- s.getSendLock();
+ s.getConsumer().getSendLock();
}
finally
{
- s.releaseSendLock();
+ s.getConsumer().releaseSendLock();
}
}
}
@@ -1077,10 +1032,10 @@ public class AMQChannel implements AMQSe
boolean requiresSuspend = _suspended.compareAndSet(false,true);
// ensure all subscriptions have seen the change to the channel state
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getSendLock();
- sub.releaseSendLock();
+ sub.getConsumer().getSendLock();
+ sub.getConsumer().releaseSendLock();
}
try
@@ -1098,16 +1053,16 @@ public class AMQChannel implements AMQSe
postRollbackTask.run();
- for(QueueEntry entry : _resendList)
+ for(MessageInstance entry : _resendList)
{
- Subscription sub = entry.getDeliveredSubscription();
+ Consumer sub = entry.getDeliveredConsumer();
if(sub == null || sub.isClosed())
{
entry.release();
}
else
{
- sub.getQueue().resend(entry, sub);
+ entry.resend();
}
}
_resendList.clear();
@@ -1115,9 +1070,9 @@ public class AMQChannel implements AMQSe
if(requiresSuspend)
{
_suspended.set(false);
- for(Subscription sub : _tag2SubscriptionMap.values())
+ for(ConsumerTarget_0_8 sub : _tag2SubscriptionTargetMap.values())
{
- sub.getQueue().deliverAsync(sub);
+ sub.getConsumer().externalStateChange();
}
}
@@ -1173,7 +1128,7 @@ public class AMQChannel implements AMQSe
private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod()
{
- public void recordMessageDelivery(final Subscription sub, final QueueEntry entry, final long deliveryTag)
+ public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag)
{
addUnacknowledgedMessage(entry, deliveryTag, sub);
}
@@ -1234,78 +1189,96 @@ public class AMQChannel implements AMQSe
}
- private class ImmediateAction implements BaseQueue.PostEnqueueAction
+ private class ImmediateAction<C extends Consumer> implements Action<MessageInstance<C>>
{
public ImmediateAction()
{
}
- public void onEnqueue(QueueEntry entry)
+ public void performAction(MessageInstance<C> entry)
{
- AMQQueue queue = entry.getQueue();
+ TransactionLogResource queue = entry.getOwningResource();
if (!entry.getDeliveredToConsumer() && entry.acquire())
{
ServerTransaction txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
final AMQMessage message = (AMQMessage) entry.getMessage();
- txn.dequeue(queue, entry.getMessage(),
- new MessageAcknowledgeAction(entries)
- {
- @Override
- public void postCommit()
+ MessageReference ref = message.newReference();
+ try
+ {
+ entry.delete();
+ txn.dequeue(queue, message,
+ new ServerTransaction.Action()
{
- try
+ @Override
+ public void postCommit()
{
- final
- ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
-
- outputConverter.writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- IMMEDIATE_DELIVERY_REPLY_TEXT);
+ try
+ {
+ final
+ ProtocolOutputConverter outputConverter =
+ _session.getProtocolOutputConverter();
+
+ outputConverter.writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ IMMEDIATE_DELIVERY_REPLY_TEXT);
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (AMQException e)
+
+ @Override
+ public void onRollback()
{
- throw new RuntimeException(e);
+
}
- super.postCommit();
}
- }
- );
- txn.commit();
+ );
+ txn.commit();
+ }
+ finally
+ {
+ ref.release();
+ }
}
else
{
- queue.checkCapacity(AMQChannel.this);
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
}
- private final class CapacityCheckAction implements BaseQueue.PostEnqueueAction
+ private final class CapacityCheckAction<C extends Consumer> implements Action<MessageInstance<C>>
{
@Override
- public void onEnqueue(final QueueEntry entry)
+ public void performAction(final MessageInstance<C> entry)
{
- AMQQueue queue = entry.getQueue();
- queue.checkCapacity(AMQChannel.this);
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(AMQChannel.this);
+ }
}
}
private class MessageAcknowledgeAction implements ServerTransaction.Action
{
- private final Collection<QueueEntry> _ackedMessages;
+ private final Collection<MessageInstance> _ackedMessages;
- public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+ public MessageAcknowledgeAction(Collection<MessageInstance> ackedMessages)
{
_ackedMessages = ackedMessages;
}
@@ -1314,7 +1287,7 @@ public class AMQChannel implements AMQSe
{
try
{
- for(QueueEntry entry : _ackedMessages)
+ for(MessageInstance entry : _ackedMessages)
{
entry.delete();
}
@@ -1337,10 +1310,10 @@ public class AMQChannel implements AMQSe
{
try
{
- for(QueueEntry entry : _ackedMessages)
- {
- entry.release();
- }
+ for(MessageInstance entry : _ackedMessages)
+ {
+ entry.release();
+ }
}
finally
{
@@ -1505,7 +1478,7 @@ public class AMQChannel implements AMQSe
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
- final QueueEntry rejectedQueueEntry = unackedMap.remove(deliveryTag);
+ final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
if (rejectedQueueEntry == null)
{
@@ -1514,36 +1487,42 @@ public class AMQChannel implements AMQSe
else
{
final ServerMessage msg = rejectedQueueEntry.getMessage();
+ final Consumer sub = rejectedQueueEntry.getDeliveredConsumer();
- int requeues = rejectedQueueEntry.routeToAlternate(new BaseQueue.PostEnqueueAction()
+ int requeues = rejectedQueueEntry.routeToAlternate(new Action<MessageInstance>()
{
@Override
- public void onEnqueue(final QueueEntry requeueEntry)
+ public void performAction(final MessageInstance requeueEntry)
{
_actor.message( _logSubject, ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(),
- requeueEntry.getQueue().getName()));
+ requeueEntry.getOwningResource().getName()));
}
}, null);
if(requeues == 0)
{
- final AMQQueue queue = rejectedQueueEntry.getQueue();
-
- final Exchange altExchange = queue.getAlternateExchange();
- if (altExchange == null)
+ final TransactionLogResource owningResource = rejectedQueueEntry.getOwningResource();
+ if(owningResource instanceof AMQQueue)
{
- _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
- _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+ final AMQQueue queue = (AMQQueue) owningResource;
- }
- else
- {
- _logger.debug(
- "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
- + deliveryTag);
- _actor.message(_logSubject,
- ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ final Exchange altExchange = queue.getAlternateExchange();
+
+ if (altExchange == null)
+ {
+ _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag);
+ _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey()));
+
+ }
+ else
+ {
+ _logger.debug(
+ "Routing process provided no queues to enqueue the message on, must discard message as unable to DLQ: delivery tag: "
+ + deliveryTag);
+ _actor.message(_logSubject,
+ ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), altExchange.getName()));
+ }
}
}
@@ -1604,6 +1583,6 @@ public class AMQChannel implements AMQSe
@Override
public int getConsumerCount()
{
- return _tag2SubscriptionMap.size();
+ return _tag2SubscriptionTargetMap.size();
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Fri Feb 7 16:57:49 2014
@@ -94,8 +94,7 @@ import org.apache.qpid.server.security.a
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -1669,7 +1668,7 @@ public class AMQProtocolEngine implement
}
@Override
- public void deliverToClient(final Subscription sub, final ServerMessage message,
+ public void deliverToClient(final Consumer sub, final ServerMessage message,
final InstanceProperties props, final long deliveryTag)
throws AMQException
{
@@ -1678,7 +1677,7 @@ public class AMQProtocolEngine implement
props,
_channelId,
deliveryTag,
- ((SubscriptionImpl)sub).getConsumerTag());
+ new AMQShortString(sub.getName()));
}
}
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java Fri Feb 7 16:57:49 2014
@@ -39,7 +39,6 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.virtualhost.VirtualHost;
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java Fri Feb 7 16:57:49 2014
@@ -23,11 +23,8 @@ package org.apache.qpid.server.protocol.
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.consumer.Consumer;
import java.util.Map;
@@ -35,34 +32,28 @@ public class ExtractResendAndRequeue imp
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
- private final Map<Long, QueueEntry> _msgToRequeue;
- private final Map<Long, QueueEntry> _msgToResend;
- private final boolean _requeueIfUnableToResend;
+ private final Map<Long, MessageInstance> _msgToRequeue;
+ private final Map<Long, MessageInstance> _msgToResend;
private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
- private final MessageStore _transactionLog;
public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
- Map<Long, QueueEntry> msgToRequeue,
- Map<Long, QueueEntry> msgToResend,
- boolean requeueIfUnableToResend,
- MessageStore txnLog)
+ Map<Long, MessageInstance> msgToRequeue,
+ Map<Long, MessageInstance> msgToResend)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
_msgToResend = msgToResend;
- _requeueIfUnableToResend = requeueIfUnableToResend;
- _transactionLog = txnLog;
}
- public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
+ public boolean callback(final long deliveryTag, MessageInstance message) throws AMQException
{
message.setRedelivered();
- final Subscription subscription = message.getDeliveredSubscription();
- if (subscription != null)
+ final Consumer consumer = message.getDeliveredConsumer();
+ if (consumer != null)
{
// Consumer exists
- if (!subscription.isClosed())
+ if (!consumer.isClosed())
{
_msgToResend.put(deliveryTag, message);
}
@@ -73,58 +64,13 @@ public class ExtractResendAndRequeue imp
}
else
{
- // Message has no consumer tag, so was "delivered" to a GET
- // or consumer no longer registered
- // cannot resend, so re-queue.
- if (!message.isQueueDeleted())
- {
- if (_requeueIfUnableToResend)
- {
- _msgToRequeue.put(deliveryTag, message);
- }
- else
- {
-
- dequeueEntry(message);
- _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
- }
- }
- else
- {
- dequeueEntry(message);
- _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
- }
+ _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
// false means continue processing
return false;
}
-
- private void dequeueEntry(final QueueEntry node)
- {
- ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
- dequeueEntry(node, txn);
- }
-
- private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
- {
- txn.dequeue(node.getQueue(), node.getMessage(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- node.delete();
- }
-
- public void onRollback()
- {
-
- }
- });
- }
-
public void visitComplete()
{
_unacknowledgedMessageMap.clear();
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java Fri Feb 7 16:57:49 2014
@@ -20,15 +20,12 @@
*/
package org.apache.qpid.server.protocol.v0_8;
-import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.message.MessageDestination;
import java.util.ArrayList;
import java.util.List;
@@ -38,7 +35,7 @@ public class IncomingMessage
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private Exchange _exchange;
+ private MessageDestination _messageDestination;
/**
* Keeps a track of how many bytes we have received in body frames
@@ -77,9 +74,9 @@ public class IncomingMessage
return _messagePublishInfo.getExchange();
}
- public Exchange getExchange()
+ public MessageDestination getDestination()
{
- return _exchange;
+ return _messageDestination;
}
public ContentHeaderBody getContentHeader()
@@ -92,9 +89,9 @@ public class IncomingMessage
return getContentHeader().getBodySize();
}
- public void setExchange(final Exchange e)
+ public void setMessageDestination(final MessageDestination e)
{
- _exchange = e;
+ _messageDestination = e;
}
public int getBodyCount() throws AMQException
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java Fri Feb 7 16:57:49 2014
@@ -105,7 +105,7 @@ public class MessageMetaData implements
}
- public int writeToBuffer(int offset, ByteBuffer dest)
+ public int writeToBuffer(ByteBuffer dest)
{
int oldPosition = dest.position();
try
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org