You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2014/12/08 12:29:25 UTC

[1/3] activemq-6 git commit: small fix on pom (version 6.0 instead of 2.5)

Repository: activemq-6
Updated Branches:
  refs/heads/master 46182e415 -> d42481db5


small fix on pom (version 6.0 instead of 2.5)


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/aec50cf2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/aec50cf2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/aec50cf2

Branch: refs/heads/master
Commit: aec50cf250ce22071853afa6d084e7a354d5da9b
Parents: 46182e4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Dec 4 12:18:34 2014 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Dec 5 10:38:42 2014 -0500

----------------------------------------------------------------------
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/aec50cf2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c868686..caaeff9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,8 +30,8 @@
       <staging.siteURL>scp://people.apache.org/x1/www/activemq.apache.org</staging.siteURL>
       <netty.version>4.0.20.Final</netty.version>
       <activemq.version.versionName>Active Hornet</activemq.version.versionName>
-      <activemq.version.majorVersion>2</activemq.version.majorVersion>
-      <activemq.version.minorVersion>5</activemq.version.minorVersion>
+      <activemq.version.majorVersion>6</activemq.version.majorVersion>
+      <activemq.version.minorVersion>0</activemq.version.minorVersion>
       <activemq.version.microVersion>0</activemq.version.microVersion>
       <activemq.version.incrementingVersion>125,124,123,122</activemq.version.incrementingVersion>
       <activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix>


[3/3] activemq-6 git commit: merge #35 - Fixing issues with paging (browsing & depaging)

Posted by an...@apache.org.
merge #35 - Fixing issues with paging (browsing & depaging)


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/d42481db
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/d42481db
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/d42481db

Branch: refs/heads/master
Commit: d42481db5c01d66965b610219342b4771e7cd2c1
Parents: 46182e4 933d90a
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Dec 8 11:28:02 2014 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Dec 8 11:28:02 2014 +0000

----------------------------------------------------------------------
 .../activemq/core/server/impl/QueueImpl.java    |  26 ++++-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 pom.xml                                         |   4 +-
 .../tests/integration/client/PagingTest.java    | 116 +++++++++++++++++++
 4 files changed, 143 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-6 git commit: ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards paging

Posted by an...@apache.org.
ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards paging

https://issues.apache.org/jira/browse/ACTIVEMQ6-54

This is fixing a few issues around paging:
- Browsing it not looking towards Paging. I'm using the queue.totalIterator which is a read-only iterator that goes towards the pages messages.
- Depage is not kicking correctly in some scenarios. I have improved the logic on scheduling depage for that.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/933d90a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/933d90a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/933d90a4

Branch: refs/heads/master
Commit: 933d90a4f383c9dd0e66f0a3712bcb2f7561765c
Parents: aec50cf
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Dec 5 12:25:07 2014 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Dec 5 14:48:14 2014 -0500

----------------------------------------------------------------------
 .../activemq/core/server/impl/QueueImpl.java    |  26 ++++-
 .../core/server/impl/ServerConsumerImpl.java    |   2 +-
 .../tests/integration/client/PagingTest.java    | 116 +++++++++++++++++++
 3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
index 6703c91..9523205 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
@@ -664,6 +664,8 @@ public class QueueImpl implements Queue
             // no-op
             scheduledRunners.decrementAndGet();
          }
+
+         checkDepage();
       }
 
    }
@@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue
          }
       }
 
-      if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending)
+      checkDepage();
+   }
+
+   private void checkDepage()
+   {
+      if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext())
       {
          scheduleDepage(false);
       }
    }
 
+
+   /**
+    * This is a common check we do before scheduling depaging.. or while depaging.
+    * Before scheduling a depage runnable we verify if it fits / needs depaging.
+    * We also check for while needsDepage While depaging.
+    * This is just to avoid a copy & paste dependency
+    * @return
+    */
+   private boolean needsDepage()
+   {
+      return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
+   }
+
+
+
    private SimpleString extractGroupID(MessageReference ref)
    {
       if (internalQueue)
@@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue
       this.directDeliver = false;
 
       int depaged = 0;
-      while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
+      while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext())
       {
          depaged++;
          PagedReference reference = pageIterator.next();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
index da8d094..aa12993 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
@@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
 
       if (browseOnly)
       {
-         browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
+         browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
       }
       else
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index 56369c8..ff333ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase
     * When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
     *
     *   -Djava.util.logging.manager=org.jboss.logmanager.LogManager  -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
+    *
+    *   Note: Idea should get these from the pom and you shouldn't need to do this.
     */
    public void testFailMessagesNonDurable() throws Exception
    {
@@ -5860,6 +5862,120 @@ public class PagingTest extends ServiceTestBase
 
 
    @Test
+   public void testMultiFiltersBrowsing() throws Throwable
+   {
+      internalTestMultiFilters(true);
+   }
+
+   @Test
+   public void testMultiFiltersRegularConsumer() throws Throwable
+   {
+      internalTestMultiFilters(false);
+   }
+
+   public void internalTestMultiFilters(boolean browsing) throws Throwable
+   {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultConfig()
+         .setJournalSyncNonTransactional(false);
+
+      server = createServer(true,
+                            config,
+                            PagingTest.PAGE_SIZE,
+                            PagingTest.PAGE_MAX,
+                            new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+         locator.setBlockOnDurableSend(true);
+         ClientSessionFactory sf = locator.createSessionFactory();
+         ClientSession session = sf.createSession(true, true, 0);
+
+         session.createQueue(ADDRESS.toString(), "Q1", null, true);
+
+         PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+
+         ClientProducer prod = session.createProducer(ADDRESS);
+
+         ClientMessage msg = null;
+         store.startPaging();
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = session.createMessage(true);
+            msg.putStringProperty("color", "red");
+            msg.putIntProperty("count", i);
+            prod.send(msg);
+
+            if (i > 0 && i % 10 == 0)
+            {
+               store.startPaging();
+               store.forceAnotherPage();
+            }
+         }
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = session.createMessage(true);
+            msg.putStringProperty("color", "green");
+            msg.putIntProperty("count", i);
+            prod.send(msg);
+
+            if (i > 0 && i % 10 == 0)
+            {
+               store.startPaging();
+               store.forceAnotherPage();
+            }
+         }
+
+         session.commit();
+
+         session.close();
+
+         session = sf.createSession(false, false, 0);
+         session.start();
+
+
+         ClientConsumer cons1;
+
+         if (browsing)
+         {
+            cons1 = session.createConsumer("Q1", "color='green'", true);
+         }
+         else
+         {
+            cons1 = session.createConsumer("Q1", "color='red'", false);
+         }
+
+         for (int i = 0; i < 100; i++)
+         {
+            msg = cons1.receive(5000);
+
+            System.out.println("Received " + msg);
+            assertNotNull(msg);
+            if (!browsing)
+            {
+               msg.acknowledge();
+            }
+         }
+
+         session.commit();
+
+         session.close();
+      }
+      finally
+      {
+         server.stop();
+      }
+
+   }
+
+
+   @Test
    public void testPendingACKOutOfOrder() throws Throwable
    {
       clearDataRecreateServerDirs();