You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/29 19:11:35 UTC
[activemq-artemis] branch master updated: ARTEMIS-2347
JournalStorageManager::stopReplication can deadlock while stopping
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 0d273d2 ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock while stopping
new 9e3dbde This closes #2675
0d273d2 is described below
commit 0d273d2bb84d0828627cddc62617c757810a12c6
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Mon May 20 15:09:45 2019 +0200
ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock while stopping
AbstractJournalStorageManager::performCachedLargeMessageDeletes
must enforce acquisition of manager write lock (as documented)
to avoid unlucky racing calls of stopReplication while stopping
to deadlock.
---
.../impl/journal/JournalStorageManager.java | 31 +--
.../impl/journal/JournalStorageManagerTest.java | 233 +++++++++++----------
2 files changed, 144 insertions(+), 120 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index a9cd892..555da52 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -323,19 +323,24 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
*/
@Override
protected void performCachedLargeMessageDeletes() {
- largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
- SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
- try {
- msg.delete();
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
- }
- if (replicator != null) {
- replicator.largeMessageDelete(messageId, JournalStorageManager.this);
- }
- confirmLargeMessage(largeServerMessage);
- });
- largeMessagesToDelete.clear();
+ storageManagerLock.writeLock().lock();
+ try {
+ largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
+ SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
+ try {
+ msg.delete();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
+ }
+ if (replicator != null) {
+ replicator.largeMessageDelete(messageId, JournalStorageManager.this);
+ }
+ confirmLargeMessage(largeServerMessage);
+ });
+ largeMessagesToDelete.clear();
+ } finally {
+ storageManagerLock.writeLock().unlock();
+ }
}
protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index 49d474c..861b5e0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -15,134 +15,153 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.AfterClass;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
-public class JournalStorageManagerTest {
-
- ScheduledExecutorService dumbScheduler = new ScheduledExecutorService() {
- @Override
- public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- return null;
- }
-
- @Override
- public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- return null;
- }
-
- @Override
- public void shutdown() {
-
- }
-
- @Override
- public List<Runnable> shutdownNow() {
- return null;
- }
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return false;
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return null;
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return null;
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return null;
- }
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
- return null;
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout,
- TimeUnit unit) throws InterruptedException {
- return null;
- }
+@RunWith(Parameterized.class)
+public class JournalStorageManagerTest {
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
- return null;
- }
+ @Parameterized.Parameter
+ public JournalType journalType;
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout,
- TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return null;
- }
+ @Parameterized.Parameters(name = "journal type={0}")
+ public static Collection<Object[]> getParams() {
+ final JournalType[] values = JournalType.values();
+ return Stream.of(JournalType.values())
+ .map(journalType -> new Object[]{journalType})
+ .collect(toList());
+ }
- @Override
- public void execute(Runnable command) {
+ private static ExecutorService executor;
+ private static ExecutorService ioExecutor;
+ private static ExecutorService testExecutor;
- }
- };
+ @BeforeClass
+ public static void initExecutors() {
+ executor = Executors.newSingleThreadExecutor();
+ //to allow concurrent compaction and I/O operations
+ ioExecutor = Executors.newFixedThreadPool(2);
+ testExecutor = Executors.newSingleThreadExecutor();
+ }
- ExecutorFactory dumbExecutor = new ExecutorFactory() {
- @Override
- public ArtemisExecutor getExecutor() {
- return new ArtemisExecutor() {
- @Override
- public void execute(Runnable command) {
- command.run();
- }
- };
- }
- };
+ @AfterClass
+ public static void destroyExecutors() {
+ ioExecutor.shutdownNow();
+ executor.shutdownNow();
+ testExecutor.shutdownNow();
+ }
/**
* Test of fixJournalFileSize method, of class JournalStorageManager.
*/
@Test
public void testFixJournalFileSize() {
- JournalStorageManager manager = new JournalStorageManager(new ConfigurationImpl(), null, dumbExecutor, dumbScheduler, dumbExecutor);
+ if (journalType == JournalType.ASYNCIO) {
+ assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
+ }
+ final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
+ final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+ final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
+ final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
}
+
+ @Test(timeout = 20_000)
+ public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception {
+ if (journalType == JournalType.ASYNCIO) {
+ assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
+ }
+ final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
+ final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+ final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
+ final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
+ manager.start();
+ manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+ final PostOffice postOffice = mock(PostOffice.class);
+ final JournalLoader journalLoader = mock(JournalLoader.class);
+ manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
+ final ReplicationManager replicationManager = mock(ReplicationManager.class);
+ final PagingManager pagingManager = mock(PagingManager.class);
+ when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]);
+ manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0);
+ final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage());
+ largeMessage.setDurable(true);
+ when(replicationManager.isSynchronizing()).thenReturn(true);
+ largeMessage.deleteFile();
+ final long pendingRecordID = largeMessage.getPendingRecordID();
+ final AtomicReference<CompletableFuture<Void>> stopReplication = new AtomicReference<>();
+ doAnswer(invocation -> {
+ final CompletableFuture<Void> finished = new CompletableFuture<>();
+ final CountDownLatch beforeStopReplication;
+ if (stopReplication.compareAndSet(null, finished)) {
+ beforeStopReplication = new CountDownLatch(1);
+ //before the deadlock fix:
+ //manager::stop is already owning the large message lock here
+ //but not yet the manager read lock
+ testExecutor.execute(() -> {
+ beforeStopReplication.countDown();
+ try {
+ //it needs to acquire the manager write lock
+ //and large message lock next
+ manager.stopReplication();
+ finished.complete(null);
+ } catch (Exception e) {
+ finished.completeExceptionally(e);
+ }
+ });
+ } else {
+ beforeStopReplication = null;
+ }
+ if (beforeStopReplication != null) {
+ beforeStopReplication.await();
+ //do not remove this sleep: before the deadlock fix
+ //it was needed to give manager::stopReplication the chance to acquire
+ //the manager write lock before manager::stop
+ TimeUnit.MILLISECONDS.sleep(500);
+ }
+ //confirmPendingLargeMessage will acquire manager read lock
+ return invocation.callRealMethod();
+ }).when(manager).confirmPendingLargeMessage(pendingRecordID);
+ manager.stop();
+ final CompletableFuture<Void> stoppedReplication = stopReplication.get();
+ Assert.assertNotNull(stoppedReplication);
+ stoppedReplication.get();
+ }
+
}