You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/28 10:02:56 UTC
svn commit: r1766995 -
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Author: rgodfrey
Date: Fri Oct 28 10:02:56 2016
New Revision: 1766995
URL: http://svn.apache.org/viewvc?rev=1766995&view=rev
Log:
QPID-7473 : Ensure orphaned messages are removed where there are no durable queues
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1766995&r1=1766994&r2=1766995&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Fri Oct 28 10:02:56 2016
@@ -122,20 +122,27 @@ public class AsynchronousMessageStoreRec
getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
List<ListenableFuture<Void>> queueRecoveryFutures = new ArrayList<>();
- for(Queue<?> queue : _recoveringQueues)
+ if(_recoveringQueues.isEmpty())
{
- ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
- queueRecoveryFutures.add(result);
+ return _queueRecoveryExecutor.submit(new RemoveOrphanedMessagesTask(), null);
}
- ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
- return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+ else
{
- @Override
- public Void apply(List<?> voids)
+ for (Queue<?> queue : _recoveringQueues)
{
- return null;
+ ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
+ queueRecoveryFutures.add(result);
}
- });
+ ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
+ return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+ {
+ @Override
+ public Void apply(List<?> voids)
+ {
+ return null;
+ }
+ });
+ }
}
public VirtualHost<?> getVirtualHost()
@@ -459,6 +466,32 @@ public class AsynchronousMessageStoreRec
}
+
+ private class RemoveOrphanedMessagesTask implements Runnable
+ {
+ public RemoveOrphanedMessagesTask()
+ {
+ }
+
+ @Override
+ public void run()
+ {
+ String originalThreadName = Thread.currentThread().getName();
+ Thread.currentThread().setName("Orphaned message removal");
+
+ try
+ {
+ completeRecovery();
+ }
+ finally
+ {
+ Thread.currentThread().setName(originalThreadName);
+ }
+ }
+
+ }
+
+
private class MessageInstanceVisitor implements MessageInstanceHandler
{
private final Queue<?> _queue;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org