You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/03 11:42:41 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5853 - rework fix, have store reset tracked recovered priority when higer priority messages are stored. Additional perf fix that removes unnecessary 2x multiplier on db fetch size; seem

Repository: activemq
Updated Branches:
  refs/heads/master 5abd2d231 -> eece28ac7


https://issues.apache.org/jira/browse/AMQ-5853 - rework fix, have store reset tracked recovered priority when higer priority messages are stored. Additional perf fix that removes unnecessary 2x multiplier on db fetch size; seems periodic message expiry was throwing some tests when the cache was enabled


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eece28ac
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eece28ac
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eece28ac

Branch: refs/heads/master
Commit: eece28ac7501103490e1cb7bdc547c07c6968525
Parents: fef8cac
Author: gtully <ga...@gmail.com>
Authored: Fri Jul 3 10:30:03 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Jul 3 10:30:16 2015 +0100

----------------------------------------------------------------------
 .../activemq/store/jdbc/JDBCMessageStore.java   | 12 +++--
 .../apache/activemq/store/jdbc/Statements.java  |  2 +-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  |  2 +-
 .../activemq/store/MessagePriorityTest.java     | 51 +++++++++++++++++++-
 4 files changed, 60 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 923f3f1..2270565 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -18,10 +18,7 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
 import java.util.LinkedList;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
@@ -168,6 +165,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
         if (xaXid == null) {
             onAdd(message, sequenceId, message.getPriority());
         }
+        if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) {
+            resetTrackedLastRecoveredPriority();
+        }
     }
 
     // jdbc commit order is random with concurrent connections - limit scan to lowest pending
@@ -374,10 +374,14 @@ public class JDBCMessageStore extends AbstractMessageStore {
             LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
         }
         lastRecoveredSequenceId.set(-1);
-        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
+        resetTrackedLastRecoveredPriority();
 
     }
 
+    private final void resetTrackedLastRecoveredPriority() {
+        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
+    }
+
     @Override
     public void setBatch(MessageId messageId) {
         try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index 4875562..1afe0e3 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -504,7 +504,7 @@ public class Statements {
             findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                         + " WHERE CONTAINER=?"
                                         + " AND XID IS NULL"
-                                        + " AND ((ID > ? AND ID < ? AND PRIORITY >= ?) OR PRIORITY < ?)"
+                                        + " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
                                         + " ORDER BY PRIORITY DESC, ID";
         }
         return findNextMessagesByPriorityStatement;

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index a94abdf..c77f951 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -1097,7 +1097,7 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             } else {
                 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
             }
-            s.setMaxRows(Math.min(maxReturned * 2, maxRows));
+            s.setMaxRows(Math.min(maxReturned, maxRows));
             s.setString(1, destination.getQualifiedName());
             s.setLong(2, lastRecoveredSeq);
             s.setLong(3, maxSeq);

http://git-wip-us.apache.org/repos/asf/activemq/blob/eece28ac/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index 63daa00..e0ed7e9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -31,6 +31,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
@@ -64,6 +66,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
     public int MSG_NUM = 600;
     public int HIGH_PRI = 7;
     public int LOW_PRI = 3;
+    public int MED_PRI = 4;
 
     abstract protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception;
 
@@ -568,7 +571,7 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
     }
 
     public void testQueueBacklog() throws Exception {
-        final int backlog = 18000;
+        final int backlog = 1800;
         ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST");
 
         ProducerThread lowPri = new ProducerThread(queue, backlog, LOW_PRI);
@@ -588,6 +591,23 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < 10 ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
         }
+
+        final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics();
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
+                return destinationStatistics.getEnqueues().getCount() == backlog + 10 && destinationStatistics.getDequeues().getCount() == 500;
+            }
+        }, 10000));
+    }
+
+    public void initCombosForTestLowThenHighBatc() {
+        // the cache limits the priority ordering to available memory
+        addCombinationValues("useCache", new Object[] {new Boolean(false)});
+        // expiry processing can fill the cursor with a snapshot of the producer
+        // priority, before producers are complete
+        addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
     }
 
     public void testLowThenHighBatch() throws Exception {
@@ -614,5 +634,34 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
             assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
         }
         queueConsumer.close();
+
+        producerThread.priority = LOW_PRI;
+        producerThread.run();
+        producerThread.priority = MED_PRI;
+        producerThread.run();
+
+        queueConsumer = sess.createConsumer(queue);
+        for (int i = 0; i < 10; i++) {
+            Message message = queueConsumer.receive(10000);
+            assertNotNull("expect #" + i, message);
+            assertEquals("correct priority", MED_PRI, message.getJMSPriority());
+        }
+        for (int i = 0; i < 10; i++) {
+            Message message = queueConsumer.receive(10000);
+            assertNotNull("expect #" + i, message);
+            assertEquals("correct priority", LOW_PRI, message.getJMSPriority());
+        }
+        queueConsumer.close();
+
+        producerThread.priority = HIGH_PRI;
+        producerThread.run();
+
+        queueConsumer = sess.createConsumer(queue);
+        for (int i = 0; i < 10; i++) {
+            Message message = queueConsumer.receive(10000);
+            assertNotNull("expect #" + i, message);
+            assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+        }
+        queueConsumer.close();
     }
 }


[2/2] activemq git commit: Add sanity check of browse list

Posted by gt...@apache.org.
Add sanity check of browse list


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fef8cac0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fef8cac0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fef8cac0

Branch: refs/heads/master
Commit: fef8cac05f757432c10dcf47ca7ea3028b7614d6
Parents: 5abd2d2
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 2 21:58:13 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Fri Jul 3 10:30:16 2015 +0100

----------------------------------------------------------------------
 .../src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java    | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fef8cac0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
index ce8e3ae..1ae369b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java
@@ -83,6 +83,8 @@ public class PurgeTest extends EmbeddedBrokerTestSupport {
         long count = proxy.getQueueSize();
         assertEquals("Queue size", count, messageCount);
 
+        assertEquals("Browse size", messageCount, proxy.browseMessages().size());
+
         proxy.purge();
         count = proxy.getQueueSize();
         assertEquals("Queue size", count, 0);