You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/29 19:11:35 UTC

[activemq-artemis] branch master updated: ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock while stopping

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d273d2  ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock while stopping
     new 9e3dbde  This closes #2675
0d273d2 is described below

commit 0d273d2bb84d0828627cddc62617c757810a12c6
Author: Francesco Nigro <ni...@gmail.com>
AuthorDate: Mon May 20 15:09:45 2019 +0200

    ARTEMIS-2347 JournalStorageManager::stopReplication can deadlock while stopping
    
    AbstractJournalStorageManager::performCachedLargeMessageDeletes
    must enforce acquisition of manager write lock (as documented)
    to avoid unlucky racing calls of stopReplication while stopping
    to deadlock.
---
 .../impl/journal/JournalStorageManager.java        |  31 +--
 .../impl/journal/JournalStorageManagerTest.java    | 233 +++++++++++----------
 2 files changed, 144 insertions(+), 120 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index a9cd892..555da52 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -323,19 +323,24 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
     */
    @Override
    protected void performCachedLargeMessageDeletes() {
-      largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
-         SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
-         try {
-            msg.delete();
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
-         }
-         if (replicator != null) {
-            replicator.largeMessageDelete(messageId, JournalStorageManager.this);
-         }
-         confirmLargeMessage(largeServerMessage);
-      });
-      largeMessagesToDelete.clear();
+      storageManagerLock.writeLock().lock();
+      try {
+         largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
+            SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
+            try {
+               msg.delete();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
+            }
+            if (replicator != null) {
+               replicator.largeMessageDelete(messageId, JournalStorageManager.this);
+            }
+            confirmLargeMessage(largeServerMessage);
+         });
+         largeMessagesToDelete.clear();
+      } finally {
+         storageManagerLock.writeLock().unlock();
+      }
    }
 
    protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
index 49d474c..861b5e0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java
@@ -15,134 +15,153 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.paging.PagingManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.core.server.impl.JournalLoader;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
-import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-public class JournalStorageManagerTest {
-
-   ScheduledExecutorService dumbScheduler = new ScheduledExecutorService() {
-      @Override
-      public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-         return null;
-      }
-
-      @Override
-      public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-         return null;
-      }
-
-      @Override
-      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
-         return null;
-      }
-
-      @Override
-      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
-         return null;
-      }
-
-      @Override
-      public void shutdown() {
-
-      }
-
-      @Override
-      public List<Runnable> shutdownNow() {
-         return null;
-      }
-
-      @Override
-      public boolean isShutdown() {
-         return false;
-      }
-
-      @Override
-      public boolean isTerminated() {
-         return false;
-      }
-
-      @Override
-      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
-         return false;
-      }
-
-      @Override
-      public <T> Future<T> submit(Callable<T> task) {
-         return null;
-      }
-
-      @Override
-      public <T> Future<T> submit(Runnable task, T result) {
-         return null;
-      }
-
-      @Override
-      public Future<?> submit(Runnable task) {
-         return null;
-      }
+import static java.util.stream.Collectors.toList;
+import static org.junit.Assume.assumeTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
-      @Override
-      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
-         return null;
-      }
-
-      @Override
-      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-                                           long timeout,
-                                           TimeUnit unit) throws InterruptedException {
-         return null;
-      }
+@RunWith(Parameterized.class)
+public class JournalStorageManagerTest {
 
-      @Override
-      public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
-         return null;
-      }
+   @Parameterized.Parameter
+   public JournalType journalType;
 
-      @Override
-      public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
-                             long timeout,
-                             TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-         return null;
-      }
+   @Parameterized.Parameters(name = "journal type={0}")
+   public static Collection<Object[]> getParams() {
+      final JournalType[] values = JournalType.values();
+      return Stream.of(JournalType.values())
+         .map(journalType -> new Object[]{journalType})
+         .collect(toList());
+   }
 
-      @Override
-      public void execute(Runnable command) {
+   private static ExecutorService executor;
+   private static ExecutorService ioExecutor;
+   private static ExecutorService testExecutor;
 
-      }
-   };
+   @BeforeClass
+   public static void initExecutors() {
+      executor = Executors.newSingleThreadExecutor();
+      //to allow concurrent compaction and I/O operations
+      ioExecutor = Executors.newFixedThreadPool(2);
+      testExecutor = Executors.newSingleThreadExecutor();
+   }
 
-   ExecutorFactory dumbExecutor = new ExecutorFactory() {
-      @Override
-      public ArtemisExecutor getExecutor() {
-         return new ArtemisExecutor() {
-            @Override
-            public void execute(Runnable command) {
-               command.run();
-            }
-         };
-      }
-   };
+   @AfterClass
+   public static void destroyExecutors() {
+      ioExecutor.shutdownNow();
+      executor.shutdownNow();
+      testExecutor.shutdownNow();
+   }
 
    /**
     * Test of fixJournalFileSize method, of class JournalStorageManager.
     */
    @Test
    public void testFixJournalFileSize() {
-      JournalStorageManager manager = new JournalStorageManager(new ConfigurationImpl(), null, dumbExecutor, dumbScheduler, dumbExecutor);
+      if (journalType == JournalType.ASYNCIO) {
+         assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
+      }
+      final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
+      final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+      final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
+      final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
       Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
       Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
       Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
    }
+
+   @Test(timeout = 20_000)
+   public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception {
+      if (journalType == JournalType.ASYNCIO) {
+         assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
+      }
+      final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
+      final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
+      final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
+      final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
+      manager.start();
+      manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
+      final PostOffice postOffice = mock(PostOffice.class);
+      final JournalLoader journalLoader = mock(JournalLoader.class);
+      manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
+      final ReplicationManager replicationManager = mock(ReplicationManager.class);
+      final PagingManager pagingManager = mock(PagingManager.class);
+      when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]);
+      manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0);
+      final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage());
+      largeMessage.setDurable(true);
+      when(replicationManager.isSynchronizing()).thenReturn(true);
+      largeMessage.deleteFile();
+      final long pendingRecordID = largeMessage.getPendingRecordID();
+      final AtomicReference<CompletableFuture<Void>> stopReplication = new AtomicReference<>();
+      doAnswer(invocation -> {
+         final CompletableFuture<Void> finished = new CompletableFuture<>();
+         final CountDownLatch beforeStopReplication;
+         if (stopReplication.compareAndSet(null, finished)) {
+            beforeStopReplication = new CountDownLatch(1);
+            //before the deadlock fix:
+            //manager::stop is already owning the large message lock here
+            //but not yet the manager read lock
+            testExecutor.execute(() -> {
+               beforeStopReplication.countDown();
+               try {
+                  //it needs to acquire the manager write lock
+                  //and large message lock next
+                  manager.stopReplication();
+                  finished.complete(null);
+               } catch (Exception e) {
+                  finished.completeExceptionally(e);
+               }
+            });
+         } else {
+            beforeStopReplication = null;
+         }
+         if (beforeStopReplication != null) {
+            beforeStopReplication.await();
+            //do not remove this sleep: before the deadlock fix
+            //it was needed to give manager::stopReplication the chance to acquire
+            //the manager write lock before manager::stop
+            TimeUnit.MILLISECONDS.sleep(500);
+         }
+         //confirmPendingLargeMessage will acquire manager read lock
+         return invocation.callRealMethod();
+      }).when(manager).confirmPendingLargeMessage(pendingRecordID);
+      manager.stop();
+      final CompletableFuture<Void> stoppedReplication = stopReplication.get();
+      Assert.assertNotNull(stoppedReplication);
+      stoppedReplication.get();
+   }
+
 }