You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 19:50:44 UTC

svn commit: r829626 - /qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Author: rgodfrey
Date: Sun Oct 25 18:50:44 2009
New Revision: 829626

URL: http://svn.apache.org/viewvc?rev=829626&view=rev
Log:
Fixed immediate messages to dequeue

Modified:
    qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=829626&r1=829625&r2=829626&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun Oct 25 18:50:44 2009
@@ -981,7 +981,7 @@
             {
                 final boolean immediate = _incommingMessage.isImmediate();
 
-                ServerTransaction txn = null;
+
 
                 for(AMQQueue queue : _destinationQueues)
                 {
@@ -991,31 +991,24 @@
 
                     if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
                     {
-                          if(txn == null)
-                          {
-                              txn = new LocalTransaction(_messageStore);
-                              Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
-                              entries.add(entry);
-                              txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
-                          }
-
+                        ServerTransaction txn = new LocalTransaction(_messageStore);
+                        Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+                        entries.add(entry);
+                        txn.dequeue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+                        txn.commit();
 
                         AMQMessage message = (AMQMessage) entry.getMessage();
-                                        _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
-                                                              message.getContentHeaderBody(),
-                                                              message,
-                                                              _channelId,
-                                                              AMQConstant.NO_CONSUMERS.getCode(),
-                                                             new AMQShortString("Immediate delivery is not possible."));
+                        _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+                                              message.getContentHeaderBody(),
+                                              message,
+                                              _channelId,
+                                              AMQConstant.NO_CONSUMERS.getCode(),
+                                             new AMQShortString("Immediate delivery is not possible."));
 
 
                     }
 
                 }
-                if(txn != null)
-                {
-                    txn.commit();
-                }
             }
             catch (AMQException e)
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org