You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/15 11:00:56 UTC

svn commit: r1035202 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ main/java/org/apache/activemq/store/jdbc/ main/java/org/apache/activemq/store/jdbc/adapter/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Mon Nov 15 10:00:56 2010
New Revision: 1035202

URL: http://svn.apache.org/viewvc?rev=1035202&view=rev
Log:
improve jdbc durable sub performance for long running subs and resolve regression in selector test, related to https://issues.apache.org/activemq/browse/AMQ-2985, https://issues.apache.org/activemq/browse/AMQ-2980, expose maxRows on jdbc persistence adapter to allow it to be increased for really large selectors

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Nov 15 10:00:56 2010
@@ -258,11 +258,13 @@ public class StoreDurableSubscriberCurso
     }
 
     @Override
-    public void setMaxBatchSize(int maxBatchSize) {
-        for (PendingMessageCursor storePrefetch : storePrefetches) {
-            storePrefetch.setMaxBatchSize(maxBatchSize);
+    public void setMaxBatchSize(int newMaxBatchSize) {
+        if (newMaxBatchSize > getMaxBatchSize()) {
+            for (PendingMessageCursor storePrefetch : storePrefetches) {
+                storePrefetch.setMaxBatchSize(newMaxBatchSize);
+            }
+            super.setMaxBatchSize(newMaxBatchSize);
         }
-        super.setMaxBatchSize(maxBatchSize);
     }
 
     @Override

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Nov 15 10:00:56 2010
@@ -93,4 +93,8 @@ public interface JDBCAdapter {
     long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
 
     void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
+
+    public int getMaxRows();
+
+    public void setMaxRows(int maxRows);
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Nov 15 10:00:56 2010
@@ -99,6 +99,7 @@ public class JDBCPersistenceAdapter exte
     protected ActiveMQMessageAudit audit;
     
     protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
+    protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
 
     public JDBCPersistenceAdapter() {
     }
@@ -464,6 +465,7 @@ public class JDBCPersistenceAdapter exte
     public void setAdapter(JDBCAdapter adapter) {
         this.adapter = adapter;
         this.adapter.setStatements(getStatements());
+        this.adapter.setMaxRows(getMaxRows());
     }
 
     public WireFormat getWireFormat() {
@@ -715,5 +717,16 @@ public class JDBCPersistenceAdapter exte
         synchronized(sequenceGenerator) {
             return sequenceGenerator.getNextSequenceId();
         }
-    }    
+    }
+
+    public int getMaxRows() {
+        return maxRows;
+    }
+
+    /*
+     * the max rows return from queries, with sparse selectors this may need to be increased
+     */
+    public void setMaxRows(int maxRows) {
+        this.maxRows = maxRows;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Nov 15 10:00:56 2010
@@ -56,12 +56,13 @@ import org.apache.commons.logging.LogFac
  */
 public class DefaultJDBCAdapter implements JDBCAdapter {
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
+    public static final int MAX_ROWS = 10000;
     protected Statements statements;
     protected boolean batchStatments = true;
     protected boolean prioritizedMessages;
     protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
-    // needs to be min twice the prefetch for a durable sub
-    protected int maxRows = 2000;
+    // needs to be min twice the prefetch for a durable sub and large enough for selector range
+    protected int maxRows = MAX_ROWS;
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
         s.setBytes(index, data);
@@ -507,7 +508,7 @@ public class DefaultJDBCAdapter implemen
         cleanupExclusiveLock.readLock().lock();
         try {
             s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
-            s.setMaxRows(maxReturned * 2);
+            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
             s.setString(1, destination.getQualifiedName());
             s.setString(2, clientId);
             s.setString(3, subscriptionName);
@@ -917,7 +918,7 @@ public class DefaultJDBCAdapter implemen
             } else {
                 s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
             }
-            s.setMaxRows(maxReturned * 2);
+            s.setMaxRows(Math.max(maxReturned * 2, maxRows));
             s.setString(1, destination.getQualifiedName());
             s.setLong(2, nextSeq);
             if (isPrioritizedMessages) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java Mon Nov 15 10:00:56 2010
@@ -33,6 +33,8 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.Wait;
 
 public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
 
@@ -71,7 +73,8 @@ public class DurableSubscriptionSelector
         openConsumer();
 
         sendMessage(true);
-        Thread.sleep(1000);
+
+        Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
 
         assertEquals("Message is not recieved.", 1, received);
 
@@ -140,6 +143,10 @@ public class DurableSubscriptionSelector
             broker.setDeleteAllMessagesOnStartup(true);
         }
         setDefaultPersistenceAdapter(broker);
+
+        if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+            ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000);    
+        }
         broker.start();
     }