You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/31 19:26:47 UTC
[1/2] activemq-artemis git commit: This closes #2369
Repository: activemq-artemis
Updated Branches:
refs/heads/master 3e58cf87a -> 013d16a56
This closes #2369
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/013d16a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/013d16a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/013d16a5
Branch: refs/heads/master
Commit: 013d16a567c4e06e23fbb2f45d3b99b100b4b4f5
Parents: 3e58cf8 d8627c6
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 31 15:26:40 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 15:26:40 2018 -0400
----------------------------------------------------------------------
.../cursor/impl/PageCursorProviderImpl.java | 19 +++
.../tests/integration/paging/PagingTest.java | 164 +++++++++++++++++++
2 files changed, 183 insertions(+)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: Paging not stopped if there are no
messages on one subscription
Posted by cl...@apache.org.
Paging not stopped if there are no messages on one subscription
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d8627c61
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d8627c61
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d8627c61
Branch: refs/heads/master
Commit: d8627c61a7add5f0dd5330dddfa91389cc08e62d
Parents: 3e58cf8
Author: yang wei <wy...@gmail.com>
Authored: Fri Oct 12 20:32:02 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 15:26:40 2018 -0400
----------------------------------------------------------------------
.../cursor/impl/PageCursorProviderImpl.java | 19 +++
.../tests/integration/paging/PagingTest.java | 164 +++++++++++++++++++
2 files changed, 183 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d8627c61/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index b814d9b..93869d7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -364,6 +364,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
ArrayList<PageSubscription> cursorList = cloneSubscriptions();
long minPage = checkMinPage(cursorList);
+ deliverIfNecessary(cursorList, minPage);
logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
@@ -599,6 +600,24 @@ public class PageCursorProviderImpl implements PageCursorProvider {
}
+ private void deliverIfNecessary(Collection<PageSubscription> cursorList, long minPage) {
+ boolean currentWriting = minPage == pagingStore.getCurrentWritingPage() ? true : false;
+ for (PageSubscription cursor : cursorList) {
+ long firstPage = cursor.getFirstPage();
+ if (firstPage == minPage) {
+ /**
+ * if first page is current writing page and it's not complete, or
+ * first page is before the current writing page, we need to trigger
+ * deliverAsync to delete messages in the pages.
+ */
+ if (cursor.getQueue().getMessageCount() == 0 && (!currentWriting || !cursor.isComplete(firstPage))) {
+ cursor.getQueue().deliverAsync();
+ break;
+ }
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d8627c61/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 65b5892..180993f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -6289,6 +6289,170 @@ public class PagingTest extends ActiveMQTestBase {
server.stop();
}
+ @Test
+ public void testStopPagingWithoutConsumersIfTwoPages() throws Exception {
+ testStopPagingWithoutConsumersOnOneQueue(true);
+ }
+
+ @Test
+ public void testStopPagingWithoutConsumersIfOnePage() throws Exception {
+ testStopPagingWithoutConsumersOnOneQueue(false);
+ }
+
+ private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception {
+ boolean persistentMessages = true;
+
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ try {
+ ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, false, false);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true);
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1"));
+ queue.getPageSubscription().getPagingStore().startPaging();
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+ ClientMessage message = session.createMessage(persistentMessages);
+ message.putBooleanProperty("both", true);
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+ bodyLocal.writeBytes(new byte[1024]);
+ producer.send(message);
+ session.commit();
+ session.start();
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ assertNull(consumer.receiveImmediate());
+ consumer.close();
+ session.commit();
+
+ if (forceAnotherPage) {
+ queue.getPageSubscription().getPagingStore().forceAnotherPage();
+ }
+
+ message = session.createMessage(persistentMessages);
+ message.putIntProperty("destQ", 1);
+ bodyLocal = message.getBodyBuffer();
+ bodyLocal.writeBytes(new byte[1024]);
+ producer.send(message);
+ session.commit();
+
+ consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ for (int i = 0; i < 2; i++) {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+ session.commit();
+ }
+ assertNull(consumer.receiveImmediate());
+ consumer.close();
+ session.close();
+
+ store.getCursorProvider().cleanup();
+ waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
+ sf.close();
+ locator.close();
+ } finally {
+ try {
+ server.stop();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+
+ @Test
+ public void testStopPagingWithoutMsgsOnOneQueue() throws Exception {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+ server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+ server.start();
+
+ final int numberOfMessages = 500;
+
+ locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+ sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+ ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ session.start();
+ ClientSession session2 = sf.createSession(false, false, false);
+ ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2"));
+ session2.start();
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[MESSAGE_SIZE];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= MESSAGE_SIZE; j++) {
+ bb.put(getSamplebyte(j));
+ }
+
+ /**
+ * Here we first send messages and consume them to move every subscription to the next bookmarked page.
+ * Then we send messages and consume them again, expecting paging is stopped normally.
+ */
+ for (int x = 0; x < 2; x++) {
+ for (int i = 0; i < numberOfMessages; i++) {
+ message = session.createMessage(true);
+ message.putIntProperty("destQ", 1);
+ ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+ bodyLocal.writeBytes(body);
+ producer.send(message);
+ if (i % 1000 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
+ assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+ for (int i = 0; i < numberOfMessages; i++) {
+ ClientMessage msg = consumer1.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ if (i % 500 == 0) {
+ session.commit();
+ }
+ }
+ session.commit();
+ assertNull(consumer1.receiveImmediate());
+ waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
+ }
+
+ producer.close();
+ consumer1.close();
+ consumer2.close();
+ session.close();
+ session2.close();
+ sf.close();
+ locator.close();
+ locator = null;
+ sf = null;
+ server.stop();
+ }
+
+
@Override
protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
Configuration configuration = super.createDefaultConfig(serverID, netty);