You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/07 13:15:40 UTC

svn commit: r1738119 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/virtualhost/ broker-core/src/test/java/org/apache/qpid/server/virtualhost/ broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/

Author: kwall
Date: Thu Apr  7 11:15:40 2016
New Revision: 1738119

URL: http://svn.apache.org/viewvc?rev=1738119&view=rev
Log:
QPID-7154: [Java Broker] On AMQP 1.0, when processing delivery state only dequeue if the acquisition was successfully locked

* Guards case where management actor has deleted an entry from a queue whilst the message was with the consumer.

Separately, ensure that recoverers successfully take acquisition during DTX recovery.

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Thu Apr  7 11:15:40 2016
@@ -60,6 +60,7 @@ import org.apache.qpid.server.txn.DtxBra
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -359,23 +360,33 @@ public class AsynchronousMessageStoreRec
                         {
                             final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
 
-                            entry.acquire();
-
-                            branch.dequeue(entry.getEnqueueRecord());
-
-                            branch.addPostTransactionAction(new ServerTransaction.Action()
+                            if (entry.acquire())
                             {
+                                branch.dequeue(entry.getEnqueueRecord());
 
-                                public void postCommit()
+                                branch.addPostTransactionAction(new ServerTransaction.Action()
                                 {
-                                    entry.delete();
-                                }
 
-                                public void onRollback()
-                                {
-                                    entry.release();
-                                }
-                            });
+                                    public void postCommit()
+                                    {
+                                        entry.delete();
+                                    }
+
+                                    public void onRollback()
+                                    {
+                                        entry.release();
+                                    }
+                                });
+                            }
+                            else
+                            {
+                                // Should never happen - dtx recovery is always synchronous and occurs before
+                                // any other message actors are allowed to act on the virtualhost.
+                                throw new ServerScopedRuntimeException(
+                                        "Distributed transaction dequeue handler failed to acquire " + entry +
+                                        " during recovery of queue " + queue);
+
+                            }
                         }
                         else
                         {

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Thu Apr  7 11:15:40 2016
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -54,6 +53,7 @@ import org.apache.qpid.server.txn.DtxBra
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
 
@@ -330,23 +330,33 @@ public class SynchronousMessageStoreReco
                     {
                         final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
 
-                        entry.acquire();
-
-                        branch.dequeue(entry.getEnqueueRecord());
-
-                        branch.addPostTransactionAction(new ServerTransaction.Action()
+                        if (entry.acquire())
                         {
+                            branch.dequeue(entry.getEnqueueRecord());
 
-                            public void postCommit()
+                            branch.addPostTransactionAction(new ServerTransaction.Action()
                             {
-                                entry.delete();
-                            }
 
-                            public void onRollback()
-                            {
-                                entry.release();
-                            }
-                        });
+                                public void postCommit()
+                                {
+                                    entry.delete();
+                                }
+
+                                public void onRollback()
+                                {
+                                    entry.release();
+                                }
+                            });
+                        }
+                        else
+                        {
+                            // Should never happen - dtx recovery is always synchronous and occurs before
+                            // any other message actors are allowed to act on the virtualhost.
+                            throw new ServerScopedRuntimeException(
+                                    "Distributed transaction dequeue handler failed to acquire " + entry +
+                                    " during recovery of queue " + queue);
+                        }
+
                     }
                     else
                     {

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Thu Apr  7 11:15:40 2016
@@ -312,8 +312,10 @@ public class SynchronousMessageStoreReco
         Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
 
         QueueEntry queueEntry = mock(QueueEntry.class);
+        when(queueEntry.acquire()).thenReturn(true);
         when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
 
+
         final long format = 1;
         final byte[] globalId = new byte[] {0};
         final byte[] branchId = new byte[] {0};

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Apr  7 11:15:40 2016
@@ -388,24 +388,26 @@ class ConsumerTarget_1_0 extends Abstrac
 
             if(outcome instanceof Accepted)
             {
-                _queueEntry.lockAcquisition(getConsumer());
-                txn.dequeue(_queueEntry.getEnqueueRecord(),
-                        new ServerTransaction.Action()
-                        {
-
-                            public void postCommit()
-                            {
-                                if(_queueEntry.isAcquiredBy(getConsumer()))
+                if (_queueEntry.lockAcquisition(getConsumer()))
+                {
+                    txn.dequeue(_queueEntry.getEnqueueRecord(),
+                                new ServerTransaction.Action()
                                 {
-                                    _queueEntry.delete();
-                                }
-                            }
 
-                            public void onRollback()
-                            {
+                                    public void postCommit()
+                                    {
+                                        if (_queueEntry.isAcquiredBy(getConsumer()))
+                                        {
+                                            _queueEntry.delete();
+                                        }
+                                    }
+
+                                    public void onRollback()
+                                    {
 
-                            }
-                        });
+                                    }
+                                });
+                }
                 txn.addPostTransactionAction(new ServerTransaction.Action()
                     {
                         public void postCommit()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org