You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/31 19:28:39 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2123 Paging not stopped if there are no messages on one subscription

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3e58cf87a -> 54db13326


ARTEMIS-2123 Paging not stopped if there are no messages on one subscription


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31399486
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31399486
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31399486

Branch: refs/heads/master
Commit: 31399486acf62747aef39cd146507c0fd5c0e2cd
Parents: 3e58cf8
Author: yang wei <wy...@gmail.com>
Authored: Fri Oct 12 20:32:02 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 15:28:20 2018 -0400

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  19 +++
 .../tests/integration/paging/PagingTest.java    | 164 +++++++++++++++++++
 2 files changed, 183 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31399486/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index b814d9b..93869d7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -364,6 +364,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             ArrayList<PageSubscription> cursorList = cloneSubscriptions();
 
             long minPage = checkMinPage(cursorList);
+            deliverIfNecessary(cursorList, minPage);
 
             logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
 
@@ -599,6 +600,24 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    }
 
+   private void deliverIfNecessary(Collection<PageSubscription> cursorList, long minPage) {
+      boolean currentWriting = minPage == pagingStore.getCurrentWritingPage() ? true : false;
+      for (PageSubscription cursor : cursorList) {
+         long firstPage = cursor.getFirstPage();
+         if (firstPage == minPage) {
+            /**
+             * if first page is current writing page and it's not complete, or
+             * first page is before the current writing page, we need to trigger
+             * deliverAsync to delete messages in the pages.
+             */
+            if (cursor.getQueue().getMessageCount() == 0 && (!currentWriting || !cursor.isComplete(firstPage))) {
+               cursor.getQueue().deliverAsync();
+               break;
+            }
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31399486/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 65b5892..180993f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -6289,6 +6289,170 @@ public class PagingTest extends ActiveMQTestBase {
       server.stop();
    }
 
+   @Test
+   public void testStopPagingWithoutConsumersIfTwoPages() throws Exception {
+      testStopPagingWithoutConsumersOnOneQueue(true);
+   }
+
+   @Test
+   public void testStopPagingWithoutConsumersIfOnePage() throws Exception {
+      testStopPagingWithoutConsumersOnOneQueue(false);
+   }
+
+   private void testStopPagingWithoutConsumersOnOneQueue(boolean forceAnotherPage) throws Exception {
+      boolean persistentMessages = true;
+
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+      server.start();
+
+      try {
+         ServerLocator locator = createInVMNonHALocator().setClientFailureCheckPeriod(120000).setConnectionTTL(5000000).setCallTimeout(120000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSession session = sf.createSession(false, false, false);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1 or both=true"), true);
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2 or both=true"), true);
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+         Queue queue = server.locateQueue(PagingTest.ADDRESS.concat("=1"));
+         queue.getPageSubscription().getPagingStore().startPaging();
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+         ClientMessage message = session.createMessage(persistentMessages);
+         message.putBooleanProperty("both", true);
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+         bodyLocal.writeBytes(new byte[1024]);
+         producer.send(message);
+         session.commit();
+         session.start();
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+         message = consumer.receive(5000);
+         assertNotNull(message);
+         message.acknowledge();
+         assertNull(consumer.receiveImmediate());
+         consumer.close();
+         session.commit();
+
+         if (forceAnotherPage) {
+            queue.getPageSubscription().getPagingStore().forceAnotherPage();
+         }
+
+         message = session.createMessage(persistentMessages);
+         message.putIntProperty("destQ", 1);
+         bodyLocal = message.getBodyBuffer();
+         bodyLocal.writeBytes(new byte[1024]);
+         producer.send(message);
+         session.commit();
+
+         consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+         for (int i = 0; i < 2; i++) {
+            message = consumer.receive(5000);
+            assertNotNull(message);
+            message.acknowledge();
+            session.commit();
+         }
+         assertNull(consumer.receiveImmediate());
+         consumer.close();
+         session.close();
+
+         store.getCursorProvider().cleanup();
+         waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
+         sf.close();
+         locator.close();
+      } finally {
+         try {
+            server.stop();
+         } catch (Throwable ignored) {
+         }
+      }
+   }
+
+   @Test
+   public void testStopPagingWithoutMsgsOnOneQueue() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX);
+
+      server.start();
+
+      final int numberOfMessages = 500;
+
+      locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), SimpleString.toSimpleString("destQ=1"), true);
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), SimpleString.toSimpleString("destQ=2"), true);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+      ClientConsumer consumer1 = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+      session.start();
+      ClientSession session2 = sf.createSession(false, false, false);
+      ClientConsumer consumer2 = session2.createConsumer(PagingTest.ADDRESS.concat("=2"));
+      session2.start();
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      /**
+       * Here we first send messages and consume them to move every subscription to the next bookmarked page.
+       * Then we send messages and consume them again, expecting paging is stopped normally.
+       */
+      for (int x = 0; x < 2; x++) {
+         for (int i = 0; i < numberOfMessages; i++) {
+            message = session.createMessage(true);
+            message.putIntProperty("destQ", 1);
+            ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+            bodyLocal.writeBytes(body);
+            producer.send(message);
+            if (i % 1000 == 0) {
+               session.commit();
+            }
+         }
+         session.commit();
+         assertTrue(Arrays.asList(server.getPagingManager().getStoreNames()).contains(PagingTest.ADDRESS));
+         assertTrue(server.getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+         for (int i = 0; i < numberOfMessages; i++) {
+            ClientMessage msg = consumer1.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            if (i % 500 == 0) {
+               session.commit();
+            }
+         }
+         session.commit();
+         assertNull(consumer1.receiveImmediate());
+         waitForNotPaging(server.locateQueue(PagingTest.ADDRESS.concat("=1")));
+      }
+
+      producer.close();
+      consumer1.close();
+      consumer2.close();
+      session.close();
+      session2.close();
+      sf.close();
+      locator.close();
+      locator = null;
+      sf = null;
+      server.stop();
+   }
+
+
    @Override
    protected Configuration createDefaultConfig(final int serverID, final boolean netty) throws Exception {
       Configuration configuration = super.createDefaultConfig(serverID, netty);


[2/2] activemq-artemis git commit: This closes #2369

Posted by cl...@apache.org.
This closes #2369


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54db1332
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54db1332
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54db1332

Branch: refs/heads/master
Commit: 54db13326dee672ce27c84266fac1d79970a7394
Parents: 3e58cf8 3139948
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Oct 31 15:28:32 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Oct 31 15:28:32 2018 -0400

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  19 +++
 .../tests/integration/paging/PagingTest.java    | 164 +++++++++++++++++++
 2 files changed, 183 insertions(+)
----------------------------------------------------------------------