You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/19 21:05:32 UTC

activemq-artemis git commit: ARTEMIS-2092 fix first page messages lost on server crash bug

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x e358fbde6 -> 9a949230d


ARTEMIS-2092 fix first page messages lost on server crash bug

(cherry picked from commit d868563e7b05db9cf5556a3e44f4a3bd29b787cd)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a949230
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a949230
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a949230

Branch: refs/heads/2.6.x
Commit: 9a949230d24f6aa0ca9e7e7da6ae29ede3dcc240
Parents: e358fbd
Author: yang wei <wy...@gmail.com>
Authored: Wed Sep 19 17:42:02 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 19 17:05:28 2018 -0400

----------------------------------------------------------------------
 .../journal/AbstractJournalStorageManager.java  |   5 +-
 .../tests/integration/paging/PagingTest.java    | 144 +++++++++++++++++++
 2 files changed, 148 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a949230/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 9d270f7..166b35b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1155,7 +1155,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
                   pendingCountEncoding.decode(buff);
                   pendingCountEncoding.setID(record.id);
-
+                  PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
+                  if (sub != null) {
+                     sub.notEmpty();
+                  }
                   // This can be null on testcases not interested on this outcome
                   if (pendingNonTXPageCounter != null) {
                      pendingNonTXPageCounter.add(pendingCountEncoding);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a949230/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index f97e60a..13d8f08 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -62,16 +63,20 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
 import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.Journal;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
 import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
@@ -6076,6 +6081,145 @@ public class PagingTest extends ActiveMQTestBase {
 
    }
 
+   @Test
+   public void testOnlyOnePageOnServerCrash() throws Throwable {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig();
+
+      class NonStoppablePagingStoreImpl extends PagingStoreImpl {
+
+         NonStoppablePagingStoreImpl(SimpleString address,
+                                            ScheduledExecutorService scheduledExecutor,
+                                            long syncTimeout,
+                                            PagingManager pagingManager,
+                                            StorageManager storageManager,
+                                            SequentialFileFactory fileFactory,
+                                            PagingStoreFactory storeFactory,
+                                            SimpleString storeName,
+                                            AddressSettings addressSettings,
+                                            ArtemisExecutor executor,
+                                            boolean syncNonTransactional) {
+            super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
+         }
+
+         /**
+          * Normal stopping will cleanup non tx page subscription counter which will not trigger the bug.
+          * Here we override stop to simulate server crash.
+          * @throws Exception
+          */
+         @Override
+         public synchronized void stop() throws Exception {
+         }
+      }
+
+      if (storeType == StoreConfiguration.StoreType.DATABASE) {
+         server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+            @Override
+            protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception {
+               return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+                  @Override
+                  public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
+                     return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.syncNonTransactional);
+                  }
+               };
+            }
+         };
+      } else {
+         server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+            @Override
+            protected PagingStoreFactoryNIO getPagingStoreFactory() {
+               return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+                  @Override
+                  public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
+                     return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
+                  }
+               };
+            }
+         };
+      }
+
+      addServer(server);
+
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      server.start();
+
+      // Here we send some messages to ensure the queue start paging and create only one page
+      final int numberOfMessages = 12;
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, true, false);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         message.putIntProperty("count", i);
+
+         producer.send(message);
+
+      }
+      producer.close();
+      session.close();
+
+      Queue queue = server.locateQueue(PagingTest.ADDRESS);
+      assertEquals(numberOfMessages, getMessageCount(queue));
+      assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+
+      sf.close();
+
+      server.stop();
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE + MESSAGE_SIZE);
+      server.start();
+
+      sf = createSessionFactory(locator);
+
+      session = sf.createSession(false, false, false);
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      session.start();
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         ClientMessage msg = consumer.receive(1000);
+         assertNotNull(i + "th msg is null", msg);
+         assertEquals(i, msg.getIntProperty("count").intValue());
+         msg.acknowledge();
+         System.out.println(msg);
+      }
+
+      assertNull(consumer.receiveImmediate());
+      session.commit();
+
+      session.close();
+      sf.close();
+      locator.close();
+      server.stop();
+   }
+
    @Override
    protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
       Configuration configuration = super.createDefaultConfig(serverID, netty);