You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 19:50:44 UTC
svn commit: r829626 -
/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Author: rgodfrey
Date: Sun Oct 25 18:50:44 2009
New Revision: 829626
URL: http://svn.apache.org/viewvc?rev=829626&view=rev
Log:
Fixed immediate messages to dequeue
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=829626&r1=829625&r2=829626&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun Oct 25 18:50:44 2009
@@ -981,7 +981,7 @@
{
final boolean immediate = _incommingMessage.isImmediate();
- ServerTransaction txn = null;
+
for(AMQQueue queue : _destinationQueues)
{
@@ -991,31 +991,24 @@
if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
{
- if(txn == null)
- {
- txn = new LocalTransaction(_messageStore);
- Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
- entries.add(entry);
- txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
- }
-
+ ServerTransaction txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+ txn.commit();
AMQMessage message = (AMQMessage) entry.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
- message.getContentHeaderBody(),
- message,
- _channelId,
- AMQConstant.NO_CONSUMERS.getCode(),
- new AMQShortString("Immediate delivery is not possible."));
+ _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message,
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ new AMQShortString("Immediate delivery is not possible."));
}
}
- if(txn != null)
- {
- txn.commit();
- }
}
catch (AMQException e)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org