You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/09 11:31:14 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5853 - fix cacheEnabled case - additional test and fix for both jdbc and kahadb stores

Repository: activemq
Updated Branches:
  refs/heads/master 13044decc -> 3985e7225


https://issues.apache.org/jira/browse/AMQ-5853 - fix cacheEnabled case - additional test and fix for both jdbc and kahadb stores


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3985e722
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3985e722
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3985e722

Branch: refs/heads/master
Commit: 3985e7225fb773d418d6f32adad4d4f932a4de10
Parents: 13044de
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 9 10:30:20 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 9 10:30:54 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/jdbc/JDBCMessageStore.java   | 11 ++++-
 .../activemq/store/kahadb/MessageDatabase.java  | 19 +++-----
 .../activemq/store/MessagePriorityTest.java     | 51 ++++++++++++++++++--
 3 files changed, 63 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index ac4e8ce..175002a 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -374,17 +374,24 @@ public class JDBCMessageStore extends AbstractMessageStore {
         if (LOG.isTraceEnabled()) {
             LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
         }
+        setLastRecovered(-1);
+    }
+
+    private void setLastRecovered(long val) {
         for (int i=0;i<perPriorityLastRecovered.length;i++) {
-            perPriorityLastRecovered[i] = -1;
+            perPriorityLastRecovered[i] = val;
         }
     }
 
 
     @Override
     public void setBatch(MessageId messageId) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(perPriorityLastRecovered));
+        }
         try {
             long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
-            trackLastRecovered(storedValues[0], (int)storedValues[1]);
+            setLastRecovered(storedValues[0]);
         } catch (IOException ignoredAsAlreadyLogged) {
             resetBatching();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 7aa36c3..533f0c2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3080,19 +3080,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         void setBatch(Transaction tx, Long sequence) throws IOException {
             if (sequence != null) {
                 Long nextPosition = new Long(sequence.longValue() + 1);
-                if (defaultPriorityIndex.containsKey(tx, sequence)) {
-                    lastDefaultKey = sequence;
-                    cursor.defaultCursorPosition = nextPosition.longValue();
-                } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequence)) {
-                    lastHighKey = sequence;
-                    cursor.highPriorityCursorPosition = nextPosition.longValue();
-                } else if (lowPriorityIndex.containsKey(tx, sequence)) {
-                    lastLowKey = sequence;
-                    cursor.lowPriorityCursorPosition = nextPosition.longValue();
-                } else {
-                    lastDefaultKey = sequence;
-                    cursor.defaultCursorPosition = nextPosition.longValue();
-                }
+                lastDefaultKey = sequence;
+                cursor.defaultCursorPosition = nextPosition.longValue();
+                lastHighKey = sequence;
+                cursor.highPriorityCursorPosition = nextPosition.longValue();
+                lastLowKey = sequence;
+                cursor.lowPriorityCursorPosition = nextPosition.longValue();
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3985e722/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index e1d4b09..789e45f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.store;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -669,15 +670,20 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
 
     public void initCombosForTestEveryXHi() {
         // the cache limits the priority ordering to available memory
-        addCombinationValues("useCache", new Object[] {new Boolean(false)});
+        addCombinationValues("useCache", new Object[] {Boolean.FALSE, Boolean.TRUE});
         // expiry processing can fill the cursor with a snapshot of the producer
         // priority, before producers are complete
         addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
     }
 
     public void testEveryXHi() throws Exception {
-        final int numMessages = 50;
-        ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10");
+
+        ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
+
+        // ensure we hit the limit to disable the cache
+        broker.getDestinationPolicy().getEntryFor(queue).setMemoryLimit(50*1024);
+        final String payload = new String(new byte[1024]);
+        final int numMessages = 500;
 
         final AtomicInteger received = new AtomicInteger(0);
         MessageConsumer queueConsumer = sess.createConsumer(queue);
@@ -685,12 +691,21 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
             @Override
             public void onMessage(Message message) {
                 received.incrementAndGet();
+
+                if (received.get() < 20) {
+                    try {
+                        TimeUnit.MILLISECONDS.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
             }
         });
 
         MessageProducer producer = sess.createProducer(queue);
         for (int i = 0; i < numMessages; i++) {
             Message message = sess.createMessage();
+            message.setStringProperty("payload", payload);
             if (i % 5 == 0) {
                 message.setJMSPriority(9);
             } else {
@@ -716,6 +731,36 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
             }
         }, 10000));
 
+        // do it again!
+        received.set(0);
+        destinationStatistics.reset();
+        for (int i = 0; i < numMessages; i++) {
+            Message message = sess.createMessage();
+            message.setStringProperty("payload", payload);
+            if (i % 5 == 0) {
+                message.setJMSPriority(9);
+            } else {
+                message.setJMSPriority(4);
+            }
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, message.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE);
+        }
+
+        assertTrue("Got all", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return numMessages == received.get();
+            }
+        }));
+
+
+        assertTrue("Nothing else Like dlq involved", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
+                return destinationStatistics.getEnqueues().getCount() == numMessages && destinationStatistics.getDequeues().getCount() == numMessages;
+            }
+        }, 10000));
+
         queueConsumer.close();
     }
 }