You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2014/12/08 12:29:25 UTC
[1/3] activemq-6 git commit: small fix on pom (version 6.0 instead of
2.5)
Repository: activemq-6
Updated Branches:
refs/heads/master 46182e415 -> d42481db5
small fix on pom (version 6.0 instead of 2.5)
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/aec50cf2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/aec50cf2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/aec50cf2
Branch: refs/heads/master
Commit: aec50cf250ce22071853afa6d084e7a354d5da9b
Parents: 46182e4
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Dec 4 12:18:34 2014 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Dec 5 10:38:42 2014 -0500
----------------------------------------------------------------------
pom.xml | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/aec50cf2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c868686..caaeff9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,8 +30,8 @@
<staging.siteURL>scp://people.apache.org/x1/www/activemq.apache.org</staging.siteURL>
<netty.version>4.0.20.Final</netty.version>
<activemq.version.versionName>Active Hornet</activemq.version.versionName>
- <activemq.version.majorVersion>2</activemq.version.majorVersion>
- <activemq.version.minorVersion>5</activemq.version.minorVersion>
+ <activemq.version.majorVersion>6</activemq.version.majorVersion>
+ <activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
<activemq.version.incrementingVersion>125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionSuffix>SNAPSHOT</activemq.version.versionSuffix>
[3/3] activemq-6 git commit: merge #35 - Fixing issues with paging
(browsing & depaging)
Posted by an...@apache.org.
merge #35 - Fixing issues with paging (browsing & depaging)
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/d42481db
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/d42481db
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/d42481db
Branch: refs/heads/master
Commit: d42481db5c01d66965b610219342b4771e7cd2c1
Parents: 46182e4 933d90a
Author: Andy Taylor <an...@gmail.com>
Authored: Mon Dec 8 11:28:02 2014 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Mon Dec 8 11:28:02 2014 +0000
----------------------------------------------------------------------
.../activemq/core/server/impl/QueueImpl.java | 26 ++++-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
pom.xml | 4 +-
.../tests/integration/client/PagingTest.java | 116 +++++++++++++++++++
4 files changed, 143 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-6 git commit: ACTIVEMQ6-54 Depaging is not kicking in
on some scenarios, and Browsing is not looking towards paging
Posted by an...@apache.org.
ACTIVEMQ6-54 Depaging is not kicking in on some scenarios, and Browsing is not looking towards paging
https://issues.apache.org/jira/browse/ACTIVEMQ6-54
This is fixing a few issues around paging:
- Browsing it not looking towards Paging. I'm using the queue.totalIterator which is a read-only iterator that goes towards the pages messages.
- Depage is not kicking correctly in some scenarios. I have improved the logic on scheduling depage for that.
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/933d90a4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/933d90a4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/933d90a4
Branch: refs/heads/master
Commit: 933d90a4f383c9dd0e66f0a3712bcb2f7561765c
Parents: aec50cf
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Dec 5 12:25:07 2014 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Dec 5 14:48:14 2014 -0500
----------------------------------------------------------------------
.../activemq/core/server/impl/QueueImpl.java | 26 ++++-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../tests/integration/client/PagingTest.java | 116 +++++++++++++++++++
3 files changed, 141 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
index 6703c91..9523205 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
@@ -664,6 +664,8 @@ public class QueueImpl implements Queue
// no-op
scheduledRunners.decrementAndGet();
}
+
+ checkDepage();
}
}
@@ -2188,12 +2190,32 @@ public class QueueImpl implements Queue
}
}
- if (pageIterator != null && messageReferences.size() == 0 && pageSubscription.isPaging() && pageIterator.hasNext() && !depagePending)
+ checkDepage();
+ }
+
+ private void checkDepage()
+ {
+ if (pageIterator != null && pageSubscription.isPaging() && !depagePending && needsDepage() && pageIterator.hasNext())
{
scheduleDepage(false);
}
}
+
+ /**
+ * This is a common check we do before scheduling depaging.. or while depaging.
+ * Before scheduling a depage runnable we verify if it fits / needs depaging.
+ * We also check for while needsDepage While depaging.
+ * This is just to avoid a copy & paste dependency
+ * @return
+ */
+ private boolean needsDepage()
+ {
+ return queueMemorySize.get() < pageSubscription.getPagingStore().getMaxSize();
+ }
+
+
+
private SimpleString extractGroupID(MessageReference ref)
{
if (internalQueue)
@@ -2268,7 +2290,7 @@ public class QueueImpl implements Queue
this.directDeliver = false;
int depaged = 0;
- while (timeout > System.currentTimeMillis() && queueMemorySize.get() < maxSize && pageIterator.hasNext())
+ while (timeout > System.currentTimeMillis() && needsDepage() && pageIterator.hasNext())
{
depaged++;
PagedReference reference = pageIterator.next();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
index da8d094..aa12993 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerConsumerImpl.java
@@ -213,7 +213,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
if (browseOnly)
{
- browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
+ browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
}
else
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/933d90a4/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
index 56369c8..ff333ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/PagingTest.java
@@ -5184,6 +5184,8 @@ public class PagingTest extends ServiceTestBase
* When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
*
* -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:<path_to_source>/tests/config/logging.properties
+ *
+ * Note: Idea should get these from the pom and you shouldn't need to do this.
*/
public void testFailMessagesNonDurable() throws Exception
{
@@ -5860,6 +5862,120 @@ public class PagingTest extends ServiceTestBase
@Test
+ public void testMultiFiltersBrowsing() throws Throwable
+ {
+ internalTestMultiFilters(true);
+ }
+
+ @Test
+ public void testMultiFiltersRegularConsumer() throws Throwable
+ {
+ internalTestMultiFilters(false);
+ }
+
+ public void internalTestMultiFilters(boolean browsing) throws Throwable
+ {
+ clearDataRecreateServerDirs();
+
+ Configuration config = createDefaultConfig()
+ .setJournalSyncNonTransactional(false);
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+ locator.setBlockOnDurableSend(true);
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(true, true, 0);
+
+ session.createQueue(ADDRESS.toString(), "Q1", null, true);
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+
+ ClientMessage msg = null;
+ store.startPaging();
+
+ for (int i = 0; i < 100; i++)
+ {
+ msg = session.createMessage(true);
+ msg.putStringProperty("color", "red");
+ msg.putIntProperty("count", i);
+ prod.send(msg);
+
+ if (i > 0 && i % 10 == 0)
+ {
+ store.startPaging();
+ store.forceAnotherPage();
+ }
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ msg = session.createMessage(true);
+ msg.putStringProperty("color", "green");
+ msg.putIntProperty("count", i);
+ prod.send(msg);
+
+ if (i > 0 && i % 10 == 0)
+ {
+ store.startPaging();
+ store.forceAnotherPage();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false, 0);
+ session.start();
+
+
+ ClientConsumer cons1;
+
+ if (browsing)
+ {
+ cons1 = session.createConsumer("Q1", "color='green'", true);
+ }
+ else
+ {
+ cons1 = session.createConsumer("Q1", "color='red'", false);
+ }
+
+ for (int i = 0; i < 100; i++)
+ {
+ msg = cons1.receive(5000);
+
+ System.out.println("Received " + msg);
+ assertNotNull(msg);
+ if (!browsing)
+ {
+ msg.acknowledge();
+ }
+ }
+
+ session.commit();
+
+ session.close();
+ }
+ finally
+ {
+ server.stop();
+ }
+
+ }
+
+
+ @Test
public void testPendingACKOutOfOrder() throws Throwable
{
clearDataRecreateServerDirs();