You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/26 18:52:18 UTC

svn commit: r1710665 - in /qpid/java/trunk/broker-core/src: main/java/org/apache/qpid/server/virtualhost/ test/java/org/apache/qpid/server/virtualhost/

Author: lquack
Date: Mon Oct 26 17:52:18 2015
New Revision: 1710665

URL: http://svn.apache.org/viewvc?rev=1710665&view=rev
Log:
QPID-6808: [Java Broker] Shift responsibility to handle recovery exceptions to VirtualHost

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Oct 26 17:52:18 2015
@@ -51,13 +51,13 @@ import com.google.common.util.concurrent
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import org.apache.qpid.server.configuration.updater.Task;
-import org.apache.qpid.server.model.Connection;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
+import org.apache.qpid.server.configuration.updater.Task;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.exchange.DefaultDestination;
@@ -101,6 +101,7 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
         implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, EventListener
@@ -1858,7 +1859,7 @@ public abstract class AbstractVirtualHos
                         _logger.debug("Housekeeping task got cancelled");
                         // Ignore cancellation of task
                     }
-                    catch (ExecutionException ee)
+                    catch (ExecutionException | UncheckedExecutionException ee)
                     {
                         t = ee.getCause();
                     }
@@ -1866,6 +1867,10 @@ public abstract class AbstractVirtualHos
                     {
                         Thread.currentThread().interrupt(); // ignore/reset
                     }
+                    catch (Throwable t1)
+                    {
+                        t = t1;
+                    }
                 }
                 if (t != null)
                 {
@@ -1940,8 +1945,17 @@ public abstract class AbstractVirtualHos
         {
            _messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
         }
-        _messageStoreRecoverer.recover(this);
 
+        // propagate any exception thrown during recovery into HouseKeepingTaskExecutor to handle them accordingly
+        final ListenableFuture<Void> recoveryResult = _messageStoreRecoverer.recover(this);
+        recoveryResult.addListener(new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                Futures.getUnchecked(recoveryResult);
+            }
+        }, _houseKeepingTaskExecutor);
 
         State finalState = State.ERRORED;
         try

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -26,11 +26,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,11 +67,11 @@ public class AsynchronousMessageStoreRec
     private AsynchronousRecoverer _asynchronousRecoverer;
 
     @Override
-    public void recover(final VirtualHostImpl virtualHost)
+    public ListenableFuture<Void> recover(final VirtualHostImpl virtualHost)
     {
         _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
 
-        _asynchronousRecoverer.recover();
+        return _asynchronousRecoverer.recover();
     }
 
     @Override
@@ -92,7 +96,7 @@ public class AsynchronousMessageStoreRec
         private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
         private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
         private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
-        private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool();
+        private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
         private final MessageStore.MessageStoreReader _storeReader;
         private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
 
@@ -109,14 +113,25 @@ public class AsynchronousMessageStoreRec
 
         }
 
-        public void recover()
+        public ListenableFuture<Void> recover()
         {
             getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
 
+            List<ListenableFuture<Void>> queueRecoveryFutures = new ArrayList<>();
             for(AMQQueue<?> queue : _recoveringQueues)
             {
-                _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue));
+                ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
+                queueRecoveryFutures.add(result);
             }
+            ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
+            return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+            {
+                @Override
+                public Void apply(List<?> voids)
+                {
+                    return null;
+                }
+            });
         }
 
         public VirtualHostImpl<?, ?, ?> getVirtualHost()
@@ -422,31 +437,12 @@ public class AsynchronousMessageStoreRec
                 {
                     recoverQueue(_queue);
                 }
-                catch (Throwable e)
-                {
-                    handleUncaughtException(e);
-                }
                 finally
                 {
                     Thread.currentThread().setName(originalThreadName);
                 }
             }
 
-            private void handleUncaughtException(Throwable e)
-            {
-                LOGGER.error("Unexpected exception", e);
-                Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
-                if (uncaughtExceptionHandler != null)
-                {
-                    uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
-                }
-                else
-                {
-                    // it should never happen as we set default UncaughtExceptionHandler in main
-                    e.printStackTrace();
-                    Runtime.getRuntime().halt(1);
-                }
-            }
         }
 
         private class MessageInstanceVisitor implements MessageInstanceHandler

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -20,9 +20,11 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 public interface MessageStoreRecoverer
 {
-    void recover(VirtualHostImpl virtualHost);
+    ListenableFuture<Void> recover(VirtualHostImpl virtualHost);
 
     /**
      * Cancels any in-progress message store recovery.  If message store recovery has already

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +61,7 @@ public class SynchronousMessageStoreReco
     private static final Logger _logger = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);
 
     @Override
-    public void recover(VirtualHostImpl virtualHost)
+    public ListenableFuture<Void> recover(VirtualHostImpl virtualHost)
     {
         EventLogger eventLogger = virtualHost.getEventLogger();
         MessageStore store = virtualHost.getMessageStore();
@@ -111,7 +113,7 @@ public class SynchronousMessageStoreReco
                              MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size()));
         eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
 
-
+        return Futures.immediateFuture(null);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java Mon Oct 26 17:52:18 2015
@@ -27,8 +27,12 @@ import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.store.MessageStore;
@@ -57,24 +61,34 @@ public class AsynchronousMessageStoreRec
         when(_store.newMessageStoreReader()).thenReturn(_storeReader);
     }
 
-    public void testExceptionDuringRecoveryShutsDownBroker() throws Exception
+    public void testExceptionOnRecovery() throws Exception
     {
-        final CountDownLatch uncaughtExceptionHandlerCalledLatch = new CountDownLatch(1);
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
-        {
-            @Override
-            public void uncaughtException(final Thread t, final Throwable e)
-            {
-                uncaughtExceptionHandlerCalledLatch.countDown();
-            }
-        });
-        doThrow(ServerScopedRuntimeException.class).when(_storeReader).visitMessageInstances(any(TransactionLogResource.class),
+        ServerScopedRuntimeException exception = new ServerScopedRuntimeException("test");
+        doThrow(exception).when(_storeReader).visitMessageInstances(any(TransactionLogResource.class),
                                                                                              any(MessageInstanceHandler.class));
         AMQQueue queue = mock(AMQQueue.class);
         when(_virtualHost.getQueues()).thenReturn(Collections.singleton(queue));
 
         AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
-        recoverer.recover(_virtualHost);
-        assertTrue("UncaughtExceptionHandler was not called", uncaughtExceptionHandlerCalledLatch.await(1000, TimeUnit.MILLISECONDS));
+        ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+        try
+        {
+            result.get();
+            fail("ServerScopedRuntimeException should be rethrown");
+        }
+        catch(ExecutionException e)
+        {
+            assertEquals("Unexpected cause", exception, e.getCause());
+        }
+    }
+
+    public void testRecoveryEmptyQueue() throws Exception
+    {
+        AMQQueue queue = mock(AMQQueue.class);
+        when(_virtualHost.getQueues()).thenReturn(Collections.singleton(queue));
+
+        AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
+        ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+        assertNull(result.get());
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org