You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2008/05/11 18:01:10 UTC

svn commit: r655330 - /incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java

Author: rgodfrey
Date: Sun May 11 09:01:10 2008
New Revision: 655330

URL: http://svn.apache.org/viewvc?rev=655330&view=rev
Log:
Copy over QPID-926

Modified:
    incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java

Modified: incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=655330&r1=655329&r2=655330&view=diff
==============================================================================
--- incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/branches/broker-queue-refactor/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Sun May 11 09:01:10 2008
@@ -110,6 +110,9 @@
                                    boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
             throws AMQException
     {
+
+        final boolean debug = _log.isDebugEnabled();
+        ;
         if (multiple)
         {
             if (deliveryTag == 0)
@@ -123,11 +126,14 @@
                 {
                     public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
                     {
-                        if (_log.isDebugEnabled())
+                        if (debug)
                         {
                             _log.debug("Discarding message: " + message.getMessage().getMessageId());
                         }
-
+                        if(message.getMessage().isPersistent())
+                        {
+                            beginTranIfNecessary();
+                        }
                         message.restoreCredit();
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
                         message.discard(_storeContext);
@@ -152,11 +158,14 @@
                 unacknowledgedMessageMap.drainTo(acked, deliveryTag);
                 for (QueueEntry msg : acked)
                 {
-                        if (_log.isDebugEnabled())
+                        if (debug)
                         {
                             _log.debug("Discarding message: " + msg.getMessage().getMessageId());
                         }
-
+                        if(msg.getMessage().isPersistent())
+                        {
+                            beginTranIfNecessary();
+                        }
 
                         //Message has been ack so discard it. This will dequeue and decrement the reference.
                         msg.discard(_storeContext);
@@ -176,20 +185,29 @@
                                        _channel.getChannelId());
             }
 
-            if (_log.isDebugEnabled())
+            if (debug)
             {
                 _log.debug("Discarding message: " + msg.getMessage().getMessageId());
             }
+            if(msg.getMessage().isPersistent())
+            {
+                beginTranIfNecessary();
+            }
 
             //Message has been ack so discard it. This will dequeue and decrement the reference.
             msg.discard(_storeContext);
 
-            if (_log.isDebugEnabled())
+            if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
         }
+        if(_inTran)
+        {
+            _messageStore.commitTran(_storeContext);
+            _inTran = false;
+        }
     }
 
     public void messageFullyReceived(boolean persistent) throws AMQException