You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/04/11 01:53:11 UTC

svn commit: r764111 - /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Author: ritchiem
Date: Fri Apr 10 23:53:11 2009
New Revision: 764111

URL: http://svn.apache.org/viewvc?rev=764111&view=rev
Log:
QPID-1793 : Update AMQChannel to remember any AMQException that occurs during requeue and then throws that after it has attempted to requeue all the messsagse from the unackedMap.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=764111&r1=764110&r2=764111&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Apr 10 23:53:11 2009
@@ -490,26 +490,45 @@
             }
         }
 
+        // Place to hold any error that occured during the requeueing.
+        AMQException error = null;
         for (QueueEntry unacked : messagesToBeDelivered)
         {
-            if (!unacked.isQueueDeleted())
+            try
             {
-                // Mark message redelivered
-                unacked.setRedelivered(true);
+                if (!unacked.isQueueDeleted())
+                {
+                    // Mark message redelivered
+                    unacked.setRedelivered(true);
 
-                // Ensure message is released for redelivery
-                unacked.release();
+                    // Ensure message is released for redelivery
+                    unacked.release();
 
-                // Deliver Message
-                deliveryContext.requeue(unacked);
+                    // Deliver Message
+                    deliveryContext.requeue(unacked);
 
+                }
+                else
+                {
+                    unacked.dequeueAndDelete(_storeContext);
+                }
             }
-            else
+            catch (AMQException e)
             {
-                unacked.dequeueAndDelete(_storeContext);
+                //Log the error and store it
+                _log.error(e.getMessage(),e);
+                // We store the last seen exception for rethrowing after
+                // attempting to process all the entries.
+                error = e;
             }
         }
 
+        // If we had an error during the requeue process throw it now.
+        if (error != null)
+        {
+            throw error;
+        }
+
     }
 
     /**



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org