You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/19 21:05:32 UTC
activemq-artemis git commit: ARTEMIS-2092 fix first page messages
lost on server crash bug
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x e358fbde6 -> 9a949230d
ARTEMIS-2092 fix first page messages lost on server crash bug
(cherry picked from commit d868563e7b05db9cf5556a3e44f4a3bd29b787cd)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a949230
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a949230
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a949230
Branch: refs/heads/2.6.x
Commit: 9a949230d24f6aa0ca9e7e7da6ae29ede3dcc240
Parents: e358fbd
Author: yang wei <wy...@gmail.com>
Authored: Wed Sep 19 17:42:02 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 19 17:05:28 2018 -0400
----------------------------------------------------------------------
.../journal/AbstractJournalStorageManager.java | 5 +-
.../tests/integration/paging/PagingTest.java | 144 +++++++++++++++++++
2 files changed, 148 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a949230/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 9d270f7..166b35b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1155,7 +1155,10 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
pendingCountEncoding.decode(buff);
pendingCountEncoding.setID(record.id);
-
+ PageSubscription sub = locateSubscription(pendingCountEncoding.getQueueID(), pageSubscriptions, queueInfos, pagingManager);
+ if (sub != null) {
+ sub.notEmpty();
+ }
// This can be null on testcases not interested on this outcome
if (pendingNonTXPageCounter != null) {
pendingNonTXPageCounter.add(pendingCountEncoding);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a949230/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index f97e60a..13d8f08 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -62,16 +63,20 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
@@ -6076,6 +6081,145 @@ public class PagingTest extends ActiveMQTestBase {
}
+ @Test
+ public void testOnlyOnePageOnServerCrash() throws Throwable {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig();
+
+ class NonStoppablePagingStoreImpl extends PagingStoreImpl {
+
+ NonStoppablePagingStoreImpl(SimpleString address,
+ ScheduledExecutorService scheduledExecutor,
+ long syncTimeout,
+ PagingManager pagingManager,
+ StorageManager storageManager,
+ SequentialFileFactory fileFactory,
+ PagingStoreFactory storeFactory,
+ SimpleString storeName,
+ AddressSettings addressSettings,
+ ArtemisExecutor executor,
+ boolean syncNonTransactional) {
+ super(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, fileFactory, storeFactory, storeName, addressSettings, executor, syncNonTransactional);
+ }
+
+ /**
+ * Normal stopping will cleanup non tx page subscription counter which will not trigger the bug.
+ * Here we override stop to simulate server crash.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void stop() throws Exception {
+ }
+ }
+
+ if (storeType == StoreConfiguration.StoreType.DATABASE) {
+ server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+ @Override
+ protected PagingStoreFactoryDatabase getPagingStoreFactory() throws Exception {
+ return new PagingStoreFactoryDatabase((DatabaseStorageConfiguration) this.getConfiguration().getStoreConfiguration(), this.getStorageManager(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ @Override
+ public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
+ return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.syncNonTransactional);
+ }
+ };
+ }
+ };
+ } else {
+ server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+ @Override
+ protected PagingStoreFactoryNIO getPagingStoreFactory() {
+ return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ @Override
+ public synchronized PagingStore newStore(SimpleString address, AddressSettings settings) {
+ return new NonStoppablePagingStoreImpl(address, this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(), getPagingManager(), getStorageManager(), null, this, address, settings, getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
+ }
+ };
+ }
+ };
+ }
+
+ addServer(server);
+
+ AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_SIZE + MESSAGE_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+ // Here we send some messages to ensure the queue start paging and create only one page
+ final int numberOfMessages = 12;
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("count", i);
+
+ producer.send(message);
+
+ }
+ producer.close();
+ session.close();
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+ assertEquals(numberOfMessages, getMessageCount(queue));
+ assertEquals(1, server.getPagingManager().getPageStore(ADDRESS).getNumberOfPages());
+
+ sf.close();
+
+ server.stop();
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_SIZE + MESSAGE_SIZE);
+ server.start();
+
+ sf = createSessionFactory(locator);
+
+ session = sf.createSession(false, false, false);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(i + "th msg is null", msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ System.out.println(msg);
+ }
+
+ assertNull(consumer.receiveImmediate());
+ session.commit();
+
+ session.close();
+ sf.close();
+ locator.close();
+ server.stop();
+ }
+
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);