You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/09 11:31:14 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5853 -
fix cacheEnabled case - additional test and fix for both jdbc and kahadb
stores
Repository: activemq
Updated Branches:
refs/heads/master 13044decc -> 3985e7225
https://issues.apache.org/jira/browse/AMQ-5853 - fix cacheEnabled case - additional test and fix for both jdbc and kahadb stores
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3985e722
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3985e722
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3985e722
Branch: refs/heads/master
Commit: 3985e7225fb773d418d6f32adad4d4f932a4de10
Parents: 13044de
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 9 10:30:20 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 9 10:30:54 2015 +0100
----------------------------------------------------------------------
.../activemq/store/jdbc/JDBCMessageStore.java | 11 ++++-
.../activemq/store/kahadb/MessageDatabase.java | 19 +++-----
.../activemq/store/MessagePriorityTest.java | 51 ++++++++++++++++++--
3 files changed, 63 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index ac4e8ce..175002a 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -374,17 +374,24 @@ public class JDBCMessageStore extends AbstractMessageStore {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
}
+ setLastRecovered(-1);
+ }
+
+ private void setLastRecovered(long val) {
for (int i=0;i<perPriorityLastRecovered.length;i++) {
- perPriorityLastRecovered[i] = -1;
+ perPriorityLastRecovered[i] = val;
}
}
@Override
public void setBatch(MessageId messageId) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered));
+ }
try {
long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
- trackLastRecovered(storedValues[0], (int)storedValues[1]);
+ setLastRecovered(storedValues[0]);
} catch (IOException ignoredAsAlreadyLogged) {
resetBatching();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 7aa36c3..533f0c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3080,19 +3080,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
void setBatch(Transaction tx, Long sequence) throws IOException {
if (sequence != null) {
Long nextPosition = new Long(sequence.longValue() + 1);
- if (defaultPriorityIndex.containsKey(tx, sequence)) {
- lastDefaultKey = sequence;
- cursor.defaultCursorPosition = nextPosition.longValue();
- } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) {
- lastHighKey = sequence;
- cursor.highPriorityCursorPosition = nextPosition.longValue();
- } else if (lowPriorityIndex.containsKey(tx, sequence)) {
- lastLowKey = sequence;
- cursor.lowPriorityCursorPosition = nextPosition.longValue();
- } else {
- lastDefaultKey = sequence;
- cursor.defaultCursorPosition = nextPosition.longValue();
- }
+ lastDefaultKey = sequence;
+ cursor.defaultCursorPosition = nextPosition.longValue();
+ lastHighKey = sequence;
+ cursor.highPriorityCursorPosition = nextPosition.longValue();
+ lastLowKey = sequence;
+ cursor.lowPriorityCursorPosition = nextPosition.longValue();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index e1d4b09..789e45f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.store;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
@@ -669,15 +670,20 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
public void initCombosForTestEveryXHi() {
// the cache limits the priority ordering to available memory
- addCombinationValues("useCache", new Object[] {new Boolean(false)});
+ addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});
// expiry processing can fill the cursor with a snapshot of the producer
// priority, before producers are complete
addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
}
public void testEveryXHi() throws Exception {
- final int numMessages = 50;
- ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10");
+
+ ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
+
+ // ensure we hit the limit to disable the cache
+ broker.getDestinationPolicy().getEntryFor(queue).setMemoryLimit(50*1024);
+ final String payload = new String(new byte[1024]);
+ final int numMessages = 500;
final AtomicInteger received = new AtomicInteger(0);
MessageConsumer queueConsumer = sess.createConsumer(queue);
@@ -685,12 +691,21 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
@Override
public void onMessage(Message message) {
received.incrementAndGet();
+
+ if (received.get() < 20) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
}
});
MessageProducer producer = sess.createProducer(queue);
for (int i = 0; i < numMessages; i++) {
Message message = sess.createMessage();
+ message.setStringProperty("payload", payload);
if (i % 5 == 0) {
message.setJMSPriority(9);
} else {
@@ -716,6 +731,36 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
}
}, 10000));
+ // do it again!
+ received.set(0);
+ destinationStatistics.reset();
+ for (int i = 0; i < numMessages; i++) {
+ Message message = sess.createMessage();
+ message.setStringProperty("payload", payload);
+ if (i % 5 == 0) {
+ message.setJMSPriority(9);
+ } else {
+ message.setJMSPriority(4);
+ }
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, message.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ assertTrue("Got all", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return numMessages == received.get();
+ }
+ }));
+
+
+ assertTrue("Nothing else Like dlq involved", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
+ return destinationStatistics.getEnqueues().getCount() == numMessages && destinationStatistics.getDequeues().getCount() == numMessages;
+ }
+ }, 10000));
+
queueConsumer.close();
}
}