You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/12 18:22:41 UTC
[1/2] activemq-artemis git commit: This closes #2166
Repository: activemq-artemis
Updated Branches:
refs/heads/master 8d9ec3e5a -> ec8914843
This closes #2166
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec891484
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec891484
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec891484
Branch: refs/heads/master
Commit: ec8914843d9482ee3387843f27fe250aa7f12def
Parents: 8d9ec3e 5ec2234
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 12 14:11:16 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 12 14:11:16 2018 -0400
----------------------------------------------------------------------
.../cursor/impl/PageSubscriptionImpl.java | 21 +++++
.../tests/integration/paging/PagingTest.java | 93 ++++++++++++++++++++
2 files changed, 114 insertions(+)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1958 Artemis may not be
able to delete pages when there are some empty page files
Posted by cl...@apache.org.
ARTEMIS-1958 Artemis may not be able to delete pages when there are some empty page files
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5ec22340
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5ec22340
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5ec22340
Branch: refs/heads/master
Commit: 5ec2234010dfad8c7ff2e49f2505ea44dba9388a
Parents: 8d9ec3e
Author: 17103355 <17...@cnsuning.com>
Authored: Thu Jun 28 20:14:50 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 12 14:11:16 2018 -0400
----------------------------------------------------------------------
.../cursor/impl/PageSubscriptionImpl.java | 21 +++++
.../tests/integration/paging/PagingTest.java | 93 ++++++++++++++++++++
2 files changed, 114 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5ec22340/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index e1c9537..5fec9a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -383,19 +383,29 @@ final class PageSubscriptionImpl implements PageSubscription {
PageCache cache = cursorProvider.getPageCache(pos.getPageNr());
+ PageCache emptyCache = null;
if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
+ emptyCache = cache;
+ saveEmptyPageAsConsumedPage(emptyCache);
// The next message is beyond what's available at the current page, so we need to move to the next page
cache = null;
}
// it will scan for the next available page
while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
+ emptyCache = cache;
retPos = moveNextPage(retPos);
cache = cursorProvider.getPageCache(retPos.getPageNr());
+
+ if (cache != null) {
+ saveEmptyPageAsConsumedPage(emptyCache);
+ }
}
if (cache == null) {
+ saveEmptyPageAsConsumedPage(emptyCache);
+
// it will be null in the case of the current writing page
return null;
} else {
@@ -787,6 +797,17 @@ final class PageSubscriptionImpl implements PageSubscription {
}
+ private void saveEmptyPageAsConsumedPage(final PageCache cache) {
+ if (cache != null && cache.getNumberOfMessages() == 0) {
+ synchronized (consumedPages) {
+ PageCursorInfo pageInfo = consumedPages.get(cache.getPageId());
+ if (pageInfo == null) {
+ consumedPages.put(cache.getPageId(), new PageCursorInfo(cache.getPageId(), cache.getNumberOfMessages(), cache));
+ }
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5ec22340/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index bc09fa1..566142d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -5405,6 +5405,99 @@ public class PagingTest extends ActiveMQTestBase {
internalTestMultiFilters(false);
}
+ @Test
+ public void testPageEmptyFile() throws Exception {
+ boolean persistentMessages = true;
+
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 100;
+
+ try {
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.forceAnotherPage();
+ store.forceAnotherPage();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(persistentMessages);
+
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ Queue queue = server.locateQueue(PagingTest.ADDRESS);
+ Assert.assertEquals(numberOfMessages, queue.getMessageCount());
+
+ store.forceAnotherPage();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ store.getCursorProvider().cleanup();
+
+ Assert.assertEquals(0, queue.getMessageCount());
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(100);
+ }
+
+ store.getCursorProvider().cleanup();
+
+ sf.close();
+
+ locator.close();
+
+ Assert.assertEquals(1, store.getNumberOfPages());
+
+ } finally {
+ try {
+ server.stop();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
public void internalTestMultiFilters(boolean browsing) throws Throwable {
clearDataRecreateServerDirs();