You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/06 16:36:27 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5853 - track per priority sequence on load from the store. Allow db to select from entire prority 0-9 range. fix and additonal test

Repository: activemq
Updated Branches:
  refs/heads/master b78ef954d -> a2697b844


https://issues.apache.org/jira/browse/AMQ-5853 - track per priority sequence on load from the store. Allow db to select from entire prority 0-9 range. fix and additonal test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2697b84
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2697b84
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2697b84

Branch: refs/heads/master
Commit: a2697b844e1ff61dee6f011ebd08eee59d23f1ca
Parents: b78ef95
Author: gtully <ga...@gmail.com>
Authored: Mon Jul 6 15:32:23 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Mon Jul 6 15:32:23 2015 +0100

----------------------------------------------------------------------
 .../apache/activemq/store/jdbc/JDBCAdapter.java |  2 +-
 .../activemq/store/jdbc/JDBCMessageStore.java   | 42 +++++++--------
 .../apache/activemq/store/jdbc/Statements.java  | 14 ++++-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java  | 16 +++---
 .../activemq/store/MessagePriorityTest.java     | 54 ++++++++++++++++++++
 5 files changed, 95 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
index df4fcf3..e611a01 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
@@ -87,7 +87,7 @@ public interface JDBCAdapter {
 
     int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
 
-    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
+    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, long maxSeq, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
 
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
index 2270565..4674d7a 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
@@ -18,8 +18,8 @@ package org.apache.activemq.store.jdbc;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.ConnectionContext;
@@ -66,11 +66,10 @@ public class JDBCMessageStore extends AbstractMessageStore {
     protected final WireFormat wireFormat;
     protected final JDBCAdapter adapter;
     protected final JDBCPersistenceAdapter persistenceAdapter;
-    protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
-    protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE -1);
     protected ActiveMQMessageAudit audit;
     protected final LinkedList<Long> pendingAdditions = new LinkedList<Long>();
-    
+    final long[] perPriorityLastRecovered = new long[10];
+
     public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
         super(destination);
         this.persistenceAdapter = persistenceAdapter;
@@ -81,6 +80,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
         if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
             recordDestinationCreation(destination);
         }
+        resetBatching();
     }
 
     private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
@@ -165,9 +165,6 @@ public class JDBCMessageStore extends AbstractMessageStore {
         if (xaXid == null) {
             onAdd(message, sequenceId, message.getPriority());
         }
-        if (this.isPrioritizedMessages() && message.getPriority() > lastRecoveredPriority.get()) {
-            resetTrackedLastRecoveredPriority();
-        }
     }
 
     // jdbc commit order is random with concurrent connections - limit scan to lowest pending
@@ -334,9 +331,9 @@ public class JDBCMessageStore extends AbstractMessageStore {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             if (LOG.isTraceEnabled()) {
-                LOG.trace(this + " recoverNext lastRecovered:" + lastRecoveredSequenceId.get() + ", minPending:" + minPendingSequeunceId());
+                LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(perPriorityLastRecovered) + ", minPending:" + minPendingSequeunceId());
             }
-            adapter.doRecoverNextMessages(c, destination, minPendingSequeunceId(), lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
+            adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
                     maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
@@ -344,8 +341,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
                         msg.getMessageId().setBrokerSequenceId(sequenceId);
                         msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                         listener.recoverMessage(msg);
-                        lastRecoveredSequenceId.set(sequenceId);
-                        lastRecoveredPriority.set(msg.getPriority());
+                        trackLastRecovered(sequenceId, msg.getPriority());
                         return true;
                 }
 
@@ -366,35 +362,33 @@ public class JDBCMessageStore extends AbstractMessageStore {
 
     }
 
+    private void trackLastRecovered(long sequenceId, int priority) {
+        perPriorityLastRecovered[isPrioritizedMessages() ? priority : 0] = sequenceId;
+    }
+
     /**
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
     public void resetBatching() {
         if (LOG.isTraceEnabled()) {
-            LOG.trace(this + " resetBatching, existing last recovered seqId: " + lastRecoveredSequenceId.get());
+            LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
+        }
+        for (int i=0;i<perPriorityLastRecovered.length;i++) {
+            perPriorityLastRecovered[i] = -1;
         }
-        lastRecoveredSequenceId.set(-1);
-        resetTrackedLastRecoveredPriority();
-
     }
 
-    private final void resetTrackedLastRecoveredPriority() {
-        lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
-    }
 
     @Override
     public void setBatch(MessageId messageId) {
         try {
             long[] storedValues = persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, destination);
-            lastRecoveredSequenceId.set(storedValues[0]);
-            lastRecoveredPriority.set(storedValues[1]);
+            trackLastRecovered(storedValues[0], (int)storedValues[1]);
         } catch (IOException ignoredAsAlreadyLogged) {
-            lastRecoveredSequenceId.set(-1);
-            lastRecoveredPriority.set(Byte.MAX_VALUE -1);
+            resetBatching();
         }
         if (LOG.isTraceEnabled()) {
-            LOG.trace(this + " setBatch: new sequenceId: " + lastRecoveredSequenceId.get()
-                    + ", priority: " + lastRecoveredPriority.get());
+            LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(perPriorityLastRecovered));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
index 1afe0e3..7bc6df5 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/Statements.java
@@ -491,7 +491,7 @@ public class Statements {
     public String getFindNextMessagesStatement() {
         if (findNextMessagesStatement == null) {
             findNextMessagesStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
-                                        + " WHERE CONTAINER=? AND ID > ? AND ID < ? AND XID IS NULL ORDER BY ID";
+                                        + " WHERE CONTAINER=? AND ID < ? AND ID > ? AND XID IS NULL ORDER BY ID";
         }
         return findNextMessagesStatement;
     }
@@ -504,7 +504,17 @@ public class Statements {
             findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " + getFullMessageTableName()
                                         + " WHERE CONTAINER=?"
                                         + " AND XID IS NULL"
-                                        + " AND ((ID > ? AND ID < ? AND PRIORITY = ?) OR PRIORITY < ?)"
+                                        + " AND ID < ? "
+                                        + " AND ( (ID > ? AND PRIORITY = 9) "
+                                        + "    OR (ID > ? AND PRIORITY = 8) "
+                                        + "    OR (ID > ? AND PRIORITY = 7) "
+                                        + "    OR (ID > ? AND PRIORITY = 6) "
+                                        + "    OR (ID > ? AND PRIORITY = 5) "
+                                        + "    OR (ID > ? AND PRIORITY = 4) "
+                                        + "    OR (ID > ? AND PRIORITY = 3) "
+                                        + "    OR (ID > ? AND PRIORITY = 2) "
+                                        + "    OR (ID > ? AND PRIORITY = 1) "
+                                        + "    OR (ID > ? AND PRIORITY = 0) )"
                                         + " ORDER BY PRIORITY DESC, ID";
         }
         return findNextMessagesByPriorityStatement;

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index c77f951..9c6f3bf 100755
--- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -37,6 +37,7 @@ import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.store.jdbc.JDBCAdapter;
 import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener;
 import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener;
+import org.apache.activemq.store.jdbc.JDBCMessageStore;
 import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
 import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore;
 import org.apache.activemq.store.jdbc.Statements;
@@ -1086,8 +1087,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
         return result;
     }
 
-    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long maxSeq, long lastRecoveredSeq,
-            long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
+    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries,
+            long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
         cleanupExclusiveLock.readLock().lock();
@@ -1099,11 +1100,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
             }
             s.setMaxRows(Math.min(maxReturned, maxRows));
             s.setString(1, destination.getQualifiedName());
-            s.setLong(2, lastRecoveredSeq);
-            s.setLong(3, maxSeq);
+            s.setLong(2, maxSeq);
+            int paramId = 3;
             if (isPrioritizedMessages) {
-                s.setLong(4, priority);
-                s.setLong(5, priority);
+                for (int i=9;i>=0;i--) {
+                    s.setLong(paramId++, lastRecoveredEntries[i]);
+                }
+            } else {
+                s.setLong(paramId, lastRecoveredEntries[0]);
             }
             rs = s.executeQuery();
             int count = 0;

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2697b84/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index adee523..e1d4b09 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.activemq.store;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -664,4 +666,56 @@ abstract public class MessagePriorityTest extends CombinationTestSupport {
         }
         queueConsumer.close();
     }
+
+    public void initCombosForTestEveryXHi() {
+        // the cache limits the priority ordering to available memory
+        addCombinationValues("useCache", new Object[] {new Boolean(false)});
+        // expiry processing can fill the cursor with a snapshot of the producer
+        // priority, before producers are complete
+        addCombinationValues("expireMessagePeriod", new Object[] {new Integer(0)});
+    }
+
+    public void testEveryXHi() throws Exception {
+        final int numMessages = 50;
+        ActiveMQQueue queue = (ActiveMQQueue)sess.createQueue("TEST_LOW_THEN_HIGH_10");
+
+        final AtomicInteger received = new AtomicInteger(0);
+        MessageConsumer queueConsumer = sess.createConsumer(queue);
+        queueConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                received.incrementAndGet();
+            }
+        });
+
+        MessageProducer producer = sess.createProducer(queue);
+        for (int i = 0; i < numMessages; i++) {
+            Message message = sess.createMessage();
+            if (i % 5 == 0) {
+                message.setJMSPriority(9);
+            } else {
+                message.setJMSPriority(4);
+            }
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, message.getJMSPriority(), Message.DEFAULT_TIME_TO_LIVE);
+        }
+
+        assertTrue("Got all", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return numMessages == received.get();
+            }
+        }));
+
+
+        final DestinationStatistics destinationStatistics = ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics();
+        assertTrue("Nothing else Like dlq involved", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Enqueues: " + destinationStatistics.getEnqueues().getCount() + ", Dequeues: " + destinationStatistics.getDequeues().getCount());
+                return destinationStatistics.getEnqueues().getCount() == numMessages && destinationStatistics.getDequeues().getCount() == numMessages;
+            }
+        }, 10000));
+
+        queueConsumer.close();
+    }
 }