You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2014/12/11 19:29:20 UTC

[1/2] activemq-6 git commit: ACTIVEMQ6-54 Fixing tests broken after Paging fix

Repository: activemq-6
Updated Branches:
  refs/heads/master 0eb6ebda2 -> 1491f4a12


ACTIVEMQ6-54 Fixing tests broken after Paging fix

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

Changing the order of depaging introduced an extra check that needs to be checked now.
This will probably take care of the issue by checking if the page is complete before depage.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/09490cdb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/09490cdb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/09490cdb

Branch: refs/heads/master
Commit: 09490cdba3278f53980a906f35893e7b4c57d7fd
Parents: 0eb6ebd
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Dec 10 22:03:10 2014 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 10 22:06:35 2014 -0500

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  54 +++++----
 .../tests/integration/client/PagingTest.java    | 114 +++++++++++++++++++
 2 files changed, 146 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
index 32f30e8..3df0b78 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -425,28 +425,7 @@ public class PageCursorProviderImpl implements PageCursorProvider
             // on that case we need to move to verify it in a different way
             if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
             {
-               boolean complete = true;
-
-               for (PageSubscription cursor : cursorList)
-               {
-                  if (!cursor.isComplete(minPage))
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
-                     }
-
-                     complete = false;
-                     break;
-                  }
-                  else
-                  {
-                     if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
-                     {
-                        ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
-                     }
-                  }
-               }
+               boolean complete = checkPageCompletion(cursorList, minPage);
 
                if (!pagingStore.isStarted())
                {
@@ -475,6 +454,10 @@ public class PageCursorProviderImpl implements PageCursorProvider
 
             for (long i = pagingStore.getFirstPage(); i < minPage; i++)
             {
+               if (!checkPageCompletion(cursorList, i))
+               {
+                  break;
+               }
                Page page = pagingStore.depage();
                if (page == null)
                {
@@ -577,6 +560,33 @@ public class PageCursorProviderImpl implements PageCursorProvider
 
    }
 
+
+   private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage)
+   {
+      boolean complete = true;
+
+      for (PageSubscription cursor : cursorList)
+      {
+         if (!cursor.isComplete(minPage))
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+            }
+
+            complete = false;
+            break;
+         }
+         else
+         {
+            if (ActiveMQServerLogger.LOGGER.isDebugEnabled())
+            {
+               ActiveMQServerLogger.LOGGER.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+            }
+         }
+      }
+      return complete;
+   }
    /**
     * @return
     */

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/09490cdb/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index ff333ab..0e4751d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -113,6 +113,120 @@ public class PagingTest extends ServiceTestBase
    }
 
    @Test
+   public void testPageOnLargeMessageMultipleQueues() throws Exception
+   {
+      Configuration config = createDefaultConfig();
+
+      final int PAGE_MAX = 20 * 1024;
+
+      final int PAGE_SIZE = 10 * 1024;
+
+      HashMap<String, AddressSettings> map = new HashMap<String, AddressSettings>();
+
+      AddressSettings value = new AddressSettings();
+      map.put(ADDRESS.toString(), value);
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      final int numberOfBytes = 1024;
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+      session.createQueue(ADDRESS, ADDRESS.concat("-0"), null, true);
+      session.createQueue(ADDRESS, ADDRESS.concat("-1"), null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = null;
+
+      for (int i = 0; i < 201; i++)
+      {
+         message = session.createMessage(true);
+
+         message.getBodyBuffer().writerIndex(0);
+
+         message.getBodyBuffer().writeBytes(new byte[numberOfBytes]);
+
+         for (int j = 1; j <= numberOfBytes; j++)
+         {
+            message.getBodyBuffer().writeInt(j);
+         }
+
+         producer.send(message);
+      }
+
+
+      session.close();
+
+      server.stop();
+
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX, map);
+      server.start();
+
+      sf = createSessionFactory(locator);
+
+      for (int ad = 0; ad < 2; ad++)
+      {
+         session = sf.createSession(false, false, false);
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS.concat("-" + ad));
+
+         session.start();
+
+         for (int i = 0; i < 201; i++)
+         {
+            ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+            Assert.assertNotNull(message2);
+
+            message2.acknowledge();
+
+            Assert.assertNotNull(message2);
+         }
+
+         try
+         {
+            if (ad > -1)
+            {
+               session.commit();
+            }
+            else
+            {
+               session.rollback();
+               for (int i = 0; i < 100; i++)
+               {
+                  ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+
+                  Assert.assertNotNull(message2);
+
+                  message2.acknowledge();
+
+                  Assert.assertNotNull(message2);
+               }
+               session.commit();
+
+            }
+         }
+         catch (Throwable e)
+         {
+            System.err.println("here!!!!!!!");
+            e.printStackTrace();
+            System.exit(-1);
+         }
+
+         consumer.close();
+
+         session.close();
+      }
+   }
+
+   @Test
    public void testPageCleanup() throws Exception
    {
       clearDataRecreateServerDirs();


[2/2] activemq-6 git commit: Merge #42 - paging fix from Clebert

Posted by jb...@apache.org.
Merge #42 - paging fix from Clebert


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/1491f4a1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/1491f4a1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/1491f4a1

Branch: refs/heads/master
Commit: 1491f4a1235669470432e24a571e7871d3cb601a
Parents: 0eb6ebd 09490cd
Author: jbertram <jb...@redhat.com>
Authored: Thu Dec 11 12:27:39 2014 -0600
Committer: jbertram <jb...@redhat.com>
Committed: Thu Dec 11 12:27:39 2014 -0600

----------------------------------------------------------------------
 .../cursor/impl/PageCursorProviderImpl.java     |  54 +++++----
 .../tests/integration/client/PagingTest.java    | 114 +++++++++++++++++++
 2 files changed, 146 insertions(+), 22 deletions(-)
----------------------------------------------------------------------