You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/10/28 10:02:56 UTC

svn commit: r1766995 - /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java

Author: rgodfrey
Date: Fri Oct 28 10:02:56 2016
New Revision: 1766995

URL: http://svn.apache.org/viewvc?rev=1766995&view=rev
Log:
QPID-7473 : Ensure orphaned messages are removed where there are no durable queues

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1766995&r1=1766994&r2=1766995&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Fri Oct 28 10:02:56 2016
@@ -122,20 +122,27 @@ public class AsynchronousMessageStoreRec
             getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
 
             List<ListenableFuture<Void>> queueRecoveryFutures = new ArrayList<>();
-            for(Queue<?> queue : _recoveringQueues)
+            if(_recoveringQueues.isEmpty())
             {
-                ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
-                queueRecoveryFutures.add(result);
+                return _queueRecoveryExecutor.submit(new RemoveOrphanedMessagesTask(), null);
             }
-            ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
-            return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+            else
             {
-                @Override
-                public Void apply(List<?> voids)
+                for (Queue<?> queue : _recoveringQueues)
                 {
-                    return null;
+                    ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
+                    queueRecoveryFutures.add(result);
                 }
-            });
+                ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
+                return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+                {
+                    @Override
+                    public Void apply(List<?> voids)
+                    {
+                        return null;
+                    }
+                });
+            }
         }
 
         public VirtualHost<?> getVirtualHost()
@@ -459,6 +466,32 @@ public class AsynchronousMessageStoreRec
 
         }
 
+
+        private class RemoveOrphanedMessagesTask implements Runnable
+        {
+            public RemoveOrphanedMessagesTask()
+            {
+            }
+
+            @Override
+            public void run()
+            {
+                String originalThreadName = Thread.currentThread().getName();
+                Thread.currentThread().setName("Orphaned message removal");
+
+                try
+                {
+                    completeRecovery();
+                }
+                finally
+                {
+                    Thread.currentThread().setName(originalThreadName);
+                }
+            }
+
+        }
+
+
         private class MessageInstanceVisitor implements MessageInstanceHandler
         {
             private final Queue<?> _queue;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org