You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/26 18:52:18 UTC
svn commit: r1710665 - in /qpid/java/trunk/broker-core/src:
main/java/org/apache/qpid/server/virtualhost/
test/java/org/apache/qpid/server/virtualhost/
Author: lquack
Date: Mon Oct 26 17:52:18 2015
New Revision: 1710665
URL: http://svn.apache.org/viewvc?rev=1710665&view=rev
Log:
QPID-6808: [Java Broker] Shift responsibility to handle recovery exceptions to VirtualHost
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Mon Oct 26 17:52:18 2015
@@ -51,13 +51,13 @@ import com.google.common.util.concurrent
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import org.apache.qpid.server.configuration.updater.Task;
-import org.apache.qpid.server.model.Connection;
+import com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.pool.SuppressingInheritedAccessControlContextThreadFactory;
+import org.apache.qpid.server.configuration.updater.Task;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.exchange.DefaultDestination;
@@ -101,6 +101,7 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, EventListener
@@ -1858,7 +1859,7 @@ public abstract class AbstractVirtualHos
_logger.debug("Housekeeping task got cancelled");
// Ignore cancellation of task
}
- catch (ExecutionException ee)
+ catch (ExecutionException | UncheckedExecutionException ee)
{
t = ee.getCause();
}
@@ -1866,6 +1867,10 @@ public abstract class AbstractVirtualHos
{
Thread.currentThread().interrupt(); // ignore/reset
}
+ catch (Throwable t1)
+ {
+ t = t1;
+ }
}
if (t != null)
{
@@ -1940,8 +1945,17 @@ public abstract class AbstractVirtualHos
{
_messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
}
- _messageStoreRecoverer.recover(this);
+ // propagate any exception thrown during recovery into HouseKeepingTaskExecutor to handle them accordingly
+ final ListenableFuture<Void> recoveryResult = _messageStoreRecoverer.recover(this);
+ recoveryResult.addListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ Futures.getUnchecked(recoveryResult);
+ }
+ }, _houseKeepingTaskExecutor);
State finalState = State.ERRORED;
try
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -26,11 +26,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,11 +67,11 @@ public class AsynchronousMessageStoreRec
private AsynchronousRecoverer _asynchronousRecoverer;
@Override
- public void recover(final VirtualHostImpl virtualHost)
+ public ListenableFuture<Void> recover(final VirtualHostImpl virtualHost)
{
_asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
- _asynchronousRecoverer.recover();
+ return _asynchronousRecoverer.recover();
}
@Override
@@ -92,7 +96,7 @@ public class AsynchronousMessageStoreRec
private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
- private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool();
+ private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
private final MessageStore.MessageStoreReader _storeReader;
private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
@@ -109,14 +113,25 @@ public class AsynchronousMessageStoreRec
}
- public void recover()
+ public ListenableFuture<Void> recover()
{
getStoreReader().visitDistributedTransactions(new DistributedTransactionVisitor());
+ List<ListenableFuture<Void>> queueRecoveryFutures = new ArrayList<>();
for(AMQQueue<?> queue : _recoveringQueues)
{
- _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue));
+ ListenableFuture<Void> result = _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue), null);
+ queueRecoveryFutures.add(result);
}
+ ListenableFuture<List<Void>> combinedFuture = Futures.allAsList(queueRecoveryFutures);
+ return Futures.transform(combinedFuture, new Function<List<?>, Void>()
+ {
+ @Override
+ public Void apply(List<?> voids)
+ {
+ return null;
+ }
+ });
}
public VirtualHostImpl<?, ?, ?> getVirtualHost()
@@ -422,31 +437,12 @@ public class AsynchronousMessageStoreRec
{
recoverQueue(_queue);
}
- catch (Throwable e)
- {
- handleUncaughtException(e);
- }
finally
{
Thread.currentThread().setName(originalThreadName);
}
}
- private void handleUncaughtException(Throwable e)
- {
- LOGGER.error("Unexpected exception", e);
- Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
- if (uncaughtExceptionHandler != null)
- {
- uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), e);
- }
- else
- {
- // it should never happen as we set default UncaughtExceptionHandler in main
- e.printStackTrace();
- Runtime.getRuntime().halt(1);
- }
- }
}
private class MessageInstanceVisitor implements MessageInstanceHandler
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -20,9 +20,11 @@
*/
package org.apache.qpid.server.virtualhost;
+import com.google.common.util.concurrent.ListenableFuture;
+
public interface MessageStoreRecoverer
{
- void recover(VirtualHostImpl virtualHost);
+ ListenableFuture<Void> recover(VirtualHostImpl virtualHost);
/**
* Cancels any in-progress message store recovery. If message store recovery has already
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Mon Oct 26 17:52:18 2015
@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +61,7 @@ public class SynchronousMessageStoreReco
private static final Logger _logger = LoggerFactory.getLogger(SynchronousMessageStoreRecoverer.class);
@Override
- public void recover(VirtualHostImpl virtualHost)
+ public ListenableFuture<Void> recover(VirtualHostImpl virtualHost)
{
EventLogger eventLogger = virtualHost.getEventLogger();
MessageStore store = virtualHost.getMessageStore();
@@ -111,7 +113,7 @@ public class SynchronousMessageStoreReco
MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size()));
eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
-
+ return Futures.immediateFuture(null);
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java?rev=1710665&r1=1710664&r2=1710665&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecovererTest.java Mon Oct 26 17:52:18 2015
@@ -27,8 +27,12 @@ import static org.mockito.Mockito.when;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.MessageStore;
@@ -57,24 +61,34 @@ public class AsynchronousMessageStoreRec
when(_store.newMessageStoreReader()).thenReturn(_storeReader);
}
- public void testExceptionDuringRecoveryShutsDownBroker() throws Exception
+ public void testExceptionOnRecovery() throws Exception
{
- final CountDownLatch uncaughtExceptionHandlerCalledLatch = new CountDownLatch(1);
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
- {
- @Override
- public void uncaughtException(final Thread t, final Throwable e)
- {
- uncaughtExceptionHandlerCalledLatch.countDown();
- }
- });
- doThrow(ServerScopedRuntimeException.class).when(_storeReader).visitMessageInstances(any(TransactionLogResource.class),
+ ServerScopedRuntimeException exception = new ServerScopedRuntimeException("test");
+ doThrow(exception).when(_storeReader).visitMessageInstances(any(TransactionLogResource.class),
any(MessageInstanceHandler.class));
AMQQueue queue = mock(AMQQueue.class);
when(_virtualHost.getQueues()).thenReturn(Collections.singleton(queue));
AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
- recoverer.recover(_virtualHost);
- assertTrue("UncaughtExceptionHandler was not called", uncaughtExceptionHandlerCalledLatch.await(1000, TimeUnit.MILLISECONDS));
+ ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+ try
+ {
+ result.get();
+ fail("ServerScopedRuntimeException should be rethrown");
+ }
+ catch(ExecutionException e)
+ {
+ assertEquals("Unexpected cause", exception, e.getCause());
+ }
+ }
+
+ public void testRecoveryEmptyQueue() throws Exception
+ {
+ AMQQueue queue = mock(AMQQueue.class);
+ when(_virtualHost.getQueues()).thenReturn(Collections.singleton(queue));
+
+ AsynchronousMessageStoreRecoverer recoverer = new AsynchronousMessageStoreRecoverer();
+ ListenableFuture<Void> result = recoverer.recover(_virtualHost);
+ assertNull(result.get());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org