You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/03 11:42:41 UTC
[1/2] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5853 - rework fix, have store reset
tracked recovered priority when higer priority messages are stored.
Additional perf fix that removes unnecessary 2x multiplier on db fetch size;
seem
Repository: activemq
Updated Branches:
refs/heads/master 5abd2d231 -> eece28ac7
https://issues.apache.org/jira/browse/AMQ-5853 - rework fix, have store reset tracked recovered priority when higer priority messages are stored. Additional perf fix that removes unnecessary 2x multiplier on db fetch size; seems periodic message expiry was throwing some tests when the cache was enabled
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eece28ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eece28ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eece28ac
Branch: refs/heads/master
Commit: eece28ac7501103490e1cb7bdc547c07c6968525
Parents: fef8cac
Author: gtully <ga...@gmail.com>
Authored: Fri Jul 3 10:30:03 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Jul 3 10:30:16 2015 +0100
----------------------------------------------------------------------
.../activemq/store/jdbc/JDBCMessageStore.java | 12 +++--
.../apache/activemq/store/jdbc/Statements.java | 2 +-
.../store/jdbc/adapter/DefaultJDBCAdapter.java | 2 +-
.../activemq/store/MessagePriorityTest.java | 51 +++++++++++++++++++-
4 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 923f3f1..2270565 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -18,10 +18,7 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
@@ -168,6 +165,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (xaXid == null) {
onAdd(message, sequenceId, message.getPriority());
}
+ if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) {
+ resetTrackedLastRecoveredPriority();
+ }
}
// jdbc commit order is random with concurrent connections - limit scan to lowest pending
@@ -374,10 +374,14 @@ public class JDBCMessageStore extends AbstractMessageStore {
LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
}
lastRecoveredSequenceId.set(-1);
- lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
+ resetTrackedLastRecoveredPriority();
}
+ private final void resetTrackedLastRecoveredPriority() {
+ lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
+ }
+
@Override
public void setBatch(MessageId messageId) {
try {
http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index 4875562..1afe0e3 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -504,7 +504,7 @@ public class Statements {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
+ " WHERE CONTAINER=?"
+ " AND XID IS NULL"
- + " AND ((ID > ? AND ID < ? AND PRIORITY >= ?) OR PRIORITY < ?)"
+ + " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
+ " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;
http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index a94abdf..c77f951 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -1097,7 +1097,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
}
- s.setMaxRows(Math.min(maxReturned * 2, maxRows));
+ s.setMaxRows(Math.min(maxReturned, maxRows));
s.setString(1, destination.getQualifiedName());
s.setLong(2, lastRecoveredSeq);
s.setLong(3, maxSeq);
http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index 63daa00..e0ed7e9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -31,6 +31,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
@@ -64,6 +66,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public int MSG_NUM = 600;
public int HIGH_PRI = 7;
public int LOW_PRI = 3;
+ public int MED_PRI = 4;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
@@ -568,7 +571,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
public void testQueueBacklog() throws Exception {
- final int backlog = 18000;
+ final int backlog = 1800;
ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
@@ -588,6 +591,23 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
+
+ final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics();
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
+ return destinationStatistics.getEnqueues().getCount() == backlog + 10 && destinationStatistics.getDequeues().getCount() == 500;
+ }
+ }, 10000));
+ }
+
+ public void initCombosForTestLowThenHighBatc() {
+ // the cache limits the priority ordering to available memory
+ addCombinationValues("useCache", new Object[] {new Boolean(false)});
+ // expiry processing can fill the cursor with a snapshot of the producer
+ // priority, before producers are complete
+ addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
}
public void testLowThenHighBatch() throws Exception {
@@ -614,5 +634,34 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
}
queueConsumer.close();
+
+ producerThread.priority = LOW_PRI;
+ producerThread.run();
+ producerThread.priority = MED_PRI;
+ producerThread.run();
+
+ queueConsumer = sess.createConsumer(queue);
+ for (int i = 0; i < 10; i++) {
+ Message message = queueConsumer.receive(10000);
+ assertNotNull("expect #" + i, message);
+ assertEquals("correct priority", MED_PRI, message.getJMSPriority());
+ }
+ for (int i = 0; i < 10; i++) {
+ Message message = queueConsumer.receive(10000);
+ assertNotNull("expect #" + i, message);
+ assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
+ }
+ queueConsumer.close();
+
+ producerThread.priority = HIGH_PRI;
+ producerThread.run();
+
+ queueConsumer = sess.createConsumer(queue);
+ for (int i = 0; i < 10; i++) {
+ Message message = queueConsumer.receive(10000);
+ assertNotNull("expect #" + i, message);
+ assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+ }
+ queueConsumer.close();
}
}
[2/2] activemq git commit: Add sanity check of browse list
Posted by gt...@apache.org.
Add sanity check of browse list
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fef8cac0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fef8cac0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fef8cac0
Branch: refs/heads/master
Commit: fef8cac05f757432c10dcf47ca7ea3028b7614d6
Parents: 5abd2d2
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 2 21:58:13 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Jul 3 10:30:16 2015 +0100
----------------------------------------------------------------------
.../src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fef8cac0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
index ce8e3ae..1ae369b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
@@ -83,6 +83,8 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
long count = proxy.getQueueSize();
assertEquals("Queue size", count, messageCount);
+ assertEquals("Browse size", messageCount, proxy.browseMessages().size());
+
proxy.purge();
count = proxy.getQueueSize();
assertEquals("Queue size", count, 0);