You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/04/07 13:15:40 UTC
svn commit: r1738119 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/virtualhost/
broker-core/src/test/java/org/apache/qpid/server/virtualhost/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: kwall
Date: Thu Apr 7 11:15:40 2016
New Revision: 1738119
URL: http://svn.apache.org/viewvc?rev=1738119&view=rev
Log:
QPID-7154: [Java Broker] On AMQP 1.0, when processing delivery state only dequeue if the acquisition was successfully locked
* Guards case where management actor has deleted an entry from a queue whilst the message was with the consumer.
Separately, ensure that recoverers successfully take acquisition during DTX recovery.
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Thu Apr 7 11:15:40 2016
@@ -60,6 +60,7 @@ import org.apache.qpid.server.txn.DtxBra
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -359,23 +360,33 @@ public class AsynchronousMessageStoreRec
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
- entry.acquire();
-
- branch.dequeue(entry.getEnqueueRecord());
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
+ if (entry.acquire())
{
+ branch.dequeue(entry.getEnqueueRecord());
- public void postCommit()
+ branch.addPostTransactionAction(new ServerTransaction.Action()
{
- entry.delete();
- }
- public void onRollback()
- {
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ // Should never happen - dtx recovery is always synchronous and occurs before
+ // any other message actors are allowed to act on the virtualhost.
+ throw new ServerScopedRuntimeException(
+ "Distributed transaction dequeue handler failed to acquire " + entry +
+ " during recovery of queue " + queue);
+
+ }
}
else
{
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Thu Apr 7 11:15:40 2016
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
@@ -54,6 +53,7 @@ import org.apache.qpid.server.txn.DtxBra
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Xid;
import org.apache.qpid.transport.util.Functions;
@@ -330,23 +330,33 @@ public class SynchronousMessageStoreReco
{
final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
- entry.acquire();
-
- branch.dequeue(entry.getEnqueueRecord());
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
+ if (entry.acquire())
{
+ branch.dequeue(entry.getEnqueueRecord());
- public void postCommit()
+ branch.addPostTransactionAction(new ServerTransaction.Action()
{
- entry.delete();
- }
- public void onRollback()
- {
- entry.release();
- }
- });
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ // Should never happen - dtx recovery is always synchronous and occurs before
+ // any other message actors are allowed to act on the virtualhost.
+ throw new ServerScopedRuntimeException(
+ "Distributed transaction dequeue handler failed to acquire " + entry +
+ " during recovery of queue " + queue);
+ }
+
}
else
{
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java Thu Apr 7 11:15:40 2016
@@ -312,8 +312,10 @@ public class SynchronousMessageStoreReco
Transaction.DequeueRecord dequeueRecord = createMockDequeueRecord(queueId, messageId);
QueueEntry queueEntry = mock(QueueEntry.class);
+ when(queueEntry.acquire()).thenReturn(true);
when(queue.getMessageOnTheQueue(messageId)).thenReturn(queueEntry);
+
final long format = 1;
final byte[] globalId = new byte[] {0};
final byte[] branchId = new byte[] {0};
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1738119&r1=1738118&r2=1738119&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Apr 7 11:15:40 2016
@@ -388,24 +388,26 @@ class ConsumerTarget_1_0 extends Abstrac
if(outcome instanceof Accepted)
{
- _queueEntry.lockAcquisition(getConsumer());
- txn.dequeue(_queueEntry.getEnqueueRecord(),
- new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- if(_queueEntry.isAcquiredBy(getConsumer()))
+ if (_queueEntry.lockAcquisition(getConsumer()))
+ {
+ txn.dequeue(_queueEntry.getEnqueueRecord(),
+ new ServerTransaction.Action()
{
- _queueEntry.delete();
- }
- }
- public void onRollback()
- {
+ public void postCommit()
+ {
+ if (_queueEntry.isAcquiredBy(getConsumer()))
+ {
+ _queueEntry.delete();
+ }
+ }
+
+ public void onRollback()
+ {
- }
- });
+ }
+ });
+ }
txn.addPostTransactionAction(new ServerTransaction.Action()
{
public void postCommit()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org