You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/11/15 11:00:56 UTC
svn commit: r1035202 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/jdbc/
main/java/org/apache/activemq/store/jdbc/adapter/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Mon Nov 15 10:00:56 2010
New Revision: 1035202
URL: http://svn.apache.org/viewvc?rev=1035202&view=rev
Log:
improve jdbc durable sub performance for long running subs and resolve regression in selector test, related to https://issues.apache.org/activemq/browse/AMQ-2985, https://issues.apache.org/activemq/browse/AMQ-2980, expose maxRows on jdbc persistence adapter to allow it to be increased for really large selectors
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Nov 15 10:00:56 2010
@@ -258,11 +258,13 @@ public class StoreDurableSubscriberCurso
}
@Override
- public void setMaxBatchSize(int maxBatchSize) {
- for (PendingMessageCursor storePrefetch : storePrefetches) {
- storePrefetch.setMaxBatchSize(maxBatchSize);
+ public void setMaxBatchSize(int newMaxBatchSize) {
+ if (newMaxBatchSize > getMaxBatchSize()) {
+ for (PendingMessageCursor storePrefetch : storePrefetches) {
+ storePrefetch.setMaxBatchSize(newMaxBatchSize);
+ }
+ super.setMaxBatchSize(newMaxBatchSize);
}
- super.setMaxBatchSize(maxBatchSize);
}
@Override
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Mon Nov 15 10:00:56 2010
@@ -93,4 +93,8 @@ public interface JDBCAdapter {
long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) throws SQLException, IOException;
void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, String subscriptionName, long re, long re1) throws SQLException, IOException;
+
+ public int getMaxRows();
+
+ public void setMaxRows(int maxRows);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Mon Nov 15 10:00:56 2010
@@ -99,6 +99,7 @@ public class JDBCPersistenceAdapter exte
protected ActiveMQMessageAudit audit;
protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
+ protected int maxRows = DefaultJDBCAdapter.MAX_ROWS;
public JDBCPersistenceAdapter() {
}
@@ -464,6 +465,7 @@ public class JDBCPersistenceAdapter exte
public void setAdapter(JDBCAdapter adapter) {
this.adapter = adapter;
this.adapter.setStatements(getStatements());
+ this.adapter.setMaxRows(getMaxRows());
}
public WireFormat getWireFormat() {
@@ -715,5 +717,16 @@ public class JDBCPersistenceAdapter exte
synchronized(sequenceGenerator) {
return sequenceGenerator.getNextSequenceId();
}
- }
+ }
+
+ public int getMaxRows() {
+ return maxRows;
+ }
+
+ /*
+ * the max rows return from queries, with sparse selectors this may need to be increased
+ */
+ public void setMaxRows(int maxRows) {
+ this.maxRows = maxRows;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Mon Nov 15 10:00:56 2010
@@ -56,12 +56,13 @@ import org.apache.commons.logging.LogFac
*/
public class DefaultJDBCAdapter implements JDBCAdapter {
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
+ public static final int MAX_ROWS = 10000;
protected Statements statements;
protected boolean batchStatments = true;
protected boolean prioritizedMessages;
protected ReadWriteLock cleanupExclusiveLock = new ReentrantReadWriteLock();
- // needs to be min twice the prefetch for a durable sub
- protected int maxRows = 2000;
+ // needs to be min twice the prefetch for a durable sub and large enough for selector range
+ protected int maxRows = MAX_ROWS;
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@@ -507,7 +508,7 @@ public class DefaultJDBCAdapter implemen
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement());
- s.setMaxRows(maxReturned * 2);
+ s.setMaxRows(Math.max(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
@@ -917,7 +918,7 @@ public class DefaultJDBCAdapter implemen
} else {
s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
}
- s.setMaxRows(maxReturned * 2);
+ s.setMaxRows(Math.max(maxReturned * 2, maxRows));
s.setString(1, destination.getQualifiedName());
s.setLong(2, nextSeq);
if (isPrioritizedMessages) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java?rev=1035202&r1=1035201&r2=1035202&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionSelectorTest.java Mon Nov 15 10:00:56 2010
@@ -33,6 +33,8 @@ import org.apache.activemq.ActiveMQConne
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.Wait;
public class DurableSubscriptionSelectorTest extends org.apache.activemq.TestSupport {
@@ -71,7 +73,8 @@ public class DurableSubscriptionSelector
openConsumer();
sendMessage(true);
- Thread.sleep(1000);
+
+ Wait.waitFor(new Wait.Condition() { public boolean isSatisified() { return received >= 1;} }, 10000);
assertEquals("Message is not recieved.", 1, received);
@@ -140,6 +143,10 @@ public class DurableSubscriptionSelector
broker.setDeleteAllMessagesOnStartup(true);
}
setDefaultPersistenceAdapter(broker);
+
+ if (broker.getPersistenceAdapter() instanceof JDBCPersistenceAdapter) {
+ ((JDBCPersistenceAdapter)broker.getPersistenceAdapter()).setMaxRows(5000);
+ }
broker.start();
}