You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/12 17:51:26 UTC

svn commit: r1034466 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/store/

Author: gtully
Date: Fri Nov 12 16:51:26 2010
New Revision: 1034466

URL: http://svn.apache.org/viewvc?rev=1034466&view=rev
Log:
remove priority flag from default jdbc impl as that is shared across dests, leads to conflict between priority and non priority dests, limit rowSize returned from durable fetch so we don't hog memory or return too few matches: https://issues.apache.org/activemq/browse/AMQ-2980

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Fri Nov 12 16:51:26 2010
@@ -31,8 +31,6 @@ public interface JDBCAdapter {
 
     void setStatements(Statements statementProvider);
     
-    void setPrioritizedMessages(boolean prioritizedMessages);
-
     void doCreateTables(TransactionContext c) throws SQLException, IOException;
 
     void doDropTables(TransactionContext c) throws SQLException, IOException;
@@ -59,7 +57,10 @@ public interface JDBCAdapter {
     void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
                                JDBCMessageRecoveryListener listener) throws Exception;
 
-    void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive) throws SQLException, IOException;
+    void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long seq, long priority, int maxReturned,
+                               JDBCMessageRecoveryListener listener) throws Exception;
+
+    void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo, boolean retroactive, boolean isPrioritizeMessages) throws SQLException, IOException;
 
     SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
 
@@ -69,7 +70,7 @@ public interface JDBCAdapter {
 
     void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName, String clientId, String subscriptionName) throws SQLException, IOException;
 
-    void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
+    void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException;
 
     long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException;
 
@@ -79,11 +80,11 @@ public interface JDBCAdapter {
 
     SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
 
-    int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName) throws SQLException, IOException;
+    int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, boolean isPrioritizeMessages) throws SQLException, IOException;
 
     int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
 
-    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
+    void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq, long priority, int maxReturned, boolean isPrioritizeMessages, JDBCMessageRecoveryListener listener) throws Exception;
 
     long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriberName) throws SQLException, IOException;
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 12 16:51:26 2010
@@ -226,7 +226,8 @@ public class JDBCMessageStore extends Ab
     public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new JDBCMessageRecoveryListener() {
+            adapter.doRecoverNextMessages(c, destination, lastRecoveredSequenceId.get(), lastRecoveredPriority.get(),
+                    maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     if (listener.hasSpace()) {
@@ -301,6 +302,5 @@ public class JDBCMessageStore extends Ab
     
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
-        adapter.setPrioritizedMessages(prioritizedMessages);
     }   
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Fri Nov 12 16:51:26 2010
@@ -333,7 +333,8 @@ public class JDBCPersistenceAdapter exte
         try {
             LOG.debug("Cleaning up old messages.");
             c = getTransactionContext();
-            getAdapter().doDeleteOldMessages(c);
+            getAdapter().doDeleteOldMessages(c, false);
+            getAdapter().doDeleteOldMessages(c, true);
         } catch (IOException e) {
             LOG.warn("Old message cleanup failed due to: " + e, e);
         } catch (SQLException e) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 12 16:51:26 2010
@@ -104,8 +104,7 @@ public class JDBCTopicMessageStore exten
         throws Exception {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
-                    0, 0, maxReturned, new JDBCMessageRecoveryListener() {
+             JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
 
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                     if (listener.hasSpace()) {
@@ -122,7 +121,14 @@ public class JDBCTopicMessageStore exten
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
 
-            });
+            };
+            if (isPrioritizedMessages()) {
+                adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
+                    0, 0, maxReturned, jdbcListener);
+            } else {
+                adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
+                    0, 0, maxReturned, jdbcListener);
+            }
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
         } finally {
@@ -138,7 +144,7 @@ public class JDBCTopicMessageStore exten
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             c = persistenceAdapter.getTransactionContext();
-            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
+            adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive, isPrioritizedMessages());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
@@ -192,8 +198,7 @@ public class JDBCTopicMessageStore exten
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
-
+                result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName, isPrioritizedMessages());
         } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 12 16:51:26 2010
@@ -17,11 +17,8 @@
 package org.apache.activemq.store.jdbc.adapter;
 
 import java.io.IOException;
-import java.io.PrintStream;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -31,8 +28,6 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.jms.Message;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
@@ -65,6 +60,8 @@ public class DefaultJDBCAdapter implemen
     protected boolean batchStatments = true;
     protected boolean prioritizedMessages;
     protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
+    // needs to be min twice the prefetch for a durable sub
+    protected int maxRows = 2000;
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
         s.setBytes(index, data);
@@ -509,12 +506,44 @@ public class DefaultJDBCAdapter implemen
         ResultSet rs = null;
         cleanupExclusiveLock.readLock().lock();
         try {
-            if (isPrioritizedMessages()) {
-                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
+            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
+            s.setMaxRows(maxReturned * 2);
+            s.setString(1, destination.getQualifiedName());
+            s.setString(2, clientId);
+            s.setString(3, subscriptionName);
+            rs = s.executeQuery();
+            int count = 0;
+            if (this.statements.isUseExternalMessageReferences()) {
+                while (rs.next() && count < maxReturned) {
+                    if (listener.recoverMessageReference(rs.getString(1))) {
+                        count++;
+                    }
+                }
             } else {
-                s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
+                while (rs.next() && count < maxReturned) {
+                    if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) {
+                        count++;
+                    }
+                }
             }
-            // no set max rows as selectors may need to scan more than maxReturned messages to get what they need
+        } finally {
+            cleanupExclusiveLock.readLock().unlock();
+            close(rs);
+            close(s);
+        }
+    }
+
+    public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId,
+            String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+
+        PreparedStatement s = null;
+        ResultSet rs = null;
+        cleanupExclusiveLock.readLock().lock();
+        try {
+            s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement());
+            // maxRows needs to be twice prefetch as the db will replay all unacked, so inflight messages will
+            // be returned and suppressed by the cursor audit. It is faster this way.
+            s.setMaxRows(maxRows);
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -541,13 +570,13 @@ public class DefaultJDBCAdapter implemen
     }
 
     public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
-            String clientId, String subscriptionName) throws SQLException, IOException {
+            String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException {
         PreparedStatement s = null;
         ResultSet rs = null;
         int result = 0;
         cleanupExclusiveLock.readLock().lock();
         try {
-            if (this.isPrioritizedMessages()) {
+            if (isPrioritizedMessages) {
                 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority());
             } else {
                 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement());    
@@ -574,7 +603,7 @@ public class DefaultJDBCAdapter implemen
      * @throws SQLException 
      * @throws IOException 
      */
-    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive)
+    public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages)
             throws SQLException, IOException {
         // dumpTables(c, destination.getQualifiedName(), clientId,
         // subscriptionName);
@@ -597,7 +626,7 @@ public class DefaultJDBCAdapter implemen
             }
             s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
             int maxPriority = 1;
-            if (this.isPrioritizedMessages()) {
+            if (isPrioritizedMessages) {
                 maxPriority = 10;
             }
 
@@ -712,11 +741,11 @@ public class DefaultJDBCAdapter implemen
         }
     }
 
-    public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException {
+    public void doDeleteOldMessages(TransactionContext c, boolean isPrioritizedMessages) throws SQLException, IOException {
         PreparedStatement s = null;
         cleanupExclusiveLock.writeLock().lock();
         try {
-            if (this.isPrioritizedMessages()) {
+            if (isPrioritizedMessages) {
                 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority());
                 s = c.getConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority());
             } else {
@@ -815,16 +844,16 @@ public class DefaultJDBCAdapter implemen
 
     public void setStatements(Statements statements) {
         this.statements = statements;
-    }    
-
-    public boolean isPrioritizedMessages() {
-        return prioritizedMessages;
     }
 
-    public void setPrioritizedMessages(boolean prioritizedMessages) {
-        this.prioritizedMessages = prioritizedMessages;
+    public int getMaxRows() {
+        return maxRows;
     }
 
+    public void setMaxRows(int maxRows) {
+        this.maxRows = maxRows;
+    }    
+
     /**
      * @param c
      * @param destination
@@ -878,12 +907,12 @@ public class DefaultJDBCAdapter implemen
     }
 
     public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
-            long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception {
+            long priority, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception {
         PreparedStatement s = null;
         ResultSet rs = null;
         cleanupExclusiveLock.readLock().lock();
         try {
-            if (isPrioritizedMessages()) {
+            if (isPrioritizedMessages) {
                 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
             } else {
                 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
@@ -891,7 +920,7 @@ public class DefaultJDBCAdapter implemen
             s.setMaxRows(maxReturned * 2);
             s.setString(1, destination.getQualifiedName());
             s.setLong(2, nextSeq);
-            if (isPrioritizedMessages()) {
+            if (isPrioritizedMessages) {
                 s.setLong(3, priority);
                 s.setLong(4, priority);
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1034466&r1=1034465&r2=1034466&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java Fri Nov 12 16:51:26 2010
@@ -70,7 +70,8 @@ abstract public class MessagePriorityTes
         policy.setPrioritizedMessages(prioritizeMessages);
         policy.setUseCache(useCache);
         PolicyMap policyMap = new PolicyMap();
-        policyMap.setDefaultEntry(policy);
+        policyMap.put(new ActiveMQQueue("TEST"), policy);
+        policyMap.put(new ActiveMQTopic("TEST"), policy);
         broker.setDestinationPolicy(policyMap);
         broker.start();
         broker.waitUntilStarted();
@@ -198,7 +199,29 @@ abstract public class MessagePriorityTes
             assertNotNull("Message " + i + " was null", msg);
             assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ? HIGH_PRI : LOW_PRI, msg.getJMSPriority());
         }
-        
+
+
+        // verify that same broker/store can deal with non priority dest also
+        topic = (ActiveMQTopic)sess.createTopic("HAS_NO_PRIORITY");
+        sub = sess.createDurableSubscriber(topic, "no_priority");
+        sub.close();
+
+        lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+        highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+        lowPri.start();
+        highPri.start();
+
+        lowPri.join();
+        highPri.join();
+
+        sub = sess.createDurableSubscriber(topic, "no_priority");
+        // verify we got them all
+        for (int i = 0; i < MSG_NUM * 2; i++) {
+            Message msg = sub.receive(5000);
+            assertNotNull("Message " + i + " was null", msg);
+        }
+
     }
 
     public void initCombosForTestDurableSubsReconnect() {