You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/02/22 19:51:54 UTC
svn commit: r1073453 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/jdbc/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Tue Feb 22 18:51:54 2011
New Revision: 1073453
URL: http://svn.apache.org/viewvc?rev=1073453&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3188 - Full table scan for durable subs in jdbc store when priority enabled; very slow with large message backlog
added more state to the topic message store such that it can ask the db for a single priority at a time which is indexed. This avoids a full table scan.
send rate with active durable subs vs inactive durable subs is now in the region of 6x from 40x. validation test included.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Feb 22 18:51:54 2011
@@ -131,7 +131,7 @@ public class DurableTopicSubscription ex
topic.activate(context, this);
}
}
- synchronized (pending) {
+ synchronized (pendingLock) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.setMaxAuditDepth(getMaxAuditDepth());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Feb 22 18:51:54 2011
@@ -64,7 +64,7 @@ public abstract class PrefetchSubscripti
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
- private final Object pendingLock = new Object();
+ protected final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Tue Feb 22 18:51:54 2011
@@ -45,7 +45,7 @@ public abstract class AbstractPendingMes
protected boolean enableAudit=true;
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;
- protected boolean cacheEnabled=true;
+ private boolean cacheEnabled=true;
private boolean started=false;
protected MessageReference last = null;
protected final boolean prioritizedMessages;
@@ -329,7 +329,11 @@ public abstract class AbstractPendingMes
}
- public boolean isCacheEnabled() {
+ public synchronized boolean isCacheEnabled() {
return cacheEnabled;
}
+
+ public synchronized void setCacheEnabled(boolean val) {
+ cacheEnabled = val;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Tue Feb 22 18:51:54 2011
@@ -45,7 +45,7 @@ public abstract class AbstractStoreCurso
this.regionDestination=destination;
if (this.prioritizedMessages) {
this.batchList= new PrioritizedPendingList();
- }else {
+ } else {
this.batchList = new OrderedPendingList();
}
}
@@ -58,7 +58,7 @@ public abstract class AbstractStoreCurso
resetBatch();
this.size = getStoreSize();
this.storeHasMessages=this.size > 0;
- cacheEnabled = !this.storeHasMessages&&useCache;
+ setCacheEnabled(!this.storeHasMessages&&useCache);
}
}
@@ -95,8 +95,7 @@ public abstract class AbstractStoreCurso
* it will be a duplicate - but should be ignored
*/
if (LOG.isTraceEnabled()) {
- LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
- + " cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
+ LOG.trace(this + " - cursor got duplicate: " + message.getMessageId() + ", " + message.getPriority());
}
}
return recovered;
@@ -108,7 +107,7 @@ public abstract class AbstractStoreCurso
try {
fillBatch();
} catch (Exception e) {
- LOG.error("Failed to fill batch", e);
+ LOG.error(this + " - Failed to fill batch", e);
throw new RuntimeException(e);
}
}
@@ -145,7 +144,7 @@ public abstract class AbstractStoreCurso
try {
fillBatch();
} catch (Exception e) {
- LOG.error("Failed to fill batch", e);
+ LOG.error(this + " - Failed to fill batch", e);
throw new RuntimeException(e);
}
}
@@ -169,24 +168,22 @@ public abstract class AbstractStoreCurso
public final synchronized void addMessageLast(MessageReference node) throws Exception {
if (hasSpace()) {
- if (!cacheEnabled && size==0 && isStarted() && useCache) {
+ if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
if (LOG.isTraceEnabled()) {
- LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
- + " enabling cache for empty store " + node.getMessageId());
+ LOG.trace(this + " - enabling cache for empty store " + node.getMessageId());
}
- cacheEnabled=true;
+ setCacheEnabled(true);
}
- if (cacheEnabled) {
+ if (isCacheEnabled()) {
recoverMessage(node.getMessage(),true);
lastCachedId = node.getMessageId();
}
- } else if (cacheEnabled) {
- cacheEnabled=false;
+ } else if (isCacheEnabled()) {
+ setCacheEnabled(false);
// sync with store on disabling the cache
if (lastCachedId != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace(regionDestination.getActiveMQDestination().getPhysicalName()
- + " disabling cache on size:" + size
+ LOG.trace(this + " - disabling cache"
+ ", lastCachedId: " + lastCachedId
+ " current node Id: " + node.getMessageId());
}
@@ -203,7 +200,7 @@ public abstract class AbstractStoreCurso
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
- cacheEnabled=false;
+ setCacheEnabled(false);
size++;
}
@@ -221,7 +218,7 @@ public abstract class AbstractStoreCurso
public final synchronized void remove(MessageReference node) {
size--;
- cacheEnabled=false;
+ setCacheEnabled(false);
batchList.remove(node);
}
@@ -240,7 +237,7 @@ public abstract class AbstractStoreCurso
batchList.clear();
clearIterator(false);
batchResetNeeded = true;
- this.cacheEnabled=false;
+ setCacheEnabled(false);
}
@Override
@@ -251,8 +248,7 @@ public abstract class AbstractStoreCurso
protected final synchronized void fillBatch() {
if (LOG.isTraceEnabled()) {
- LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
- + ", hasMessages=" + this.storeHasMessages + ", size=" + this.size + ", cacheEnabled=" + this.cacheEnabled);
+ LOG.trace(this + " - fillBatch");
}
if (batchResetNeeded) {
resetBatch();
@@ -263,7 +259,7 @@ public abstract class AbstractStoreCurso
try {
doFillBatch();
} catch (Exception e) {
- LOG.error("Failed to fill batch", e);
+ LOG.error(this + " - Failed to fill batch", e);
throw new RuntimeException(e);
}
if (!this.batchList.isEmpty() || !hadSpace) {
@@ -290,7 +286,11 @@ public abstract class AbstractStoreCurso
}
return size;
}
-
+
+ public String toString() {
+ return regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled();
+ }
protected abstract void doFillBatch() throws Exception;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Feb 22 18:51:54 2011
@@ -199,7 +199,7 @@ public class FilePendingMessageCursor ex
if (hasSpace() || this.store == null) {
memoryList.add(node);
node.incrementReferenceCount();
- cacheEnabled = true;
+ setCacheEnabled(true);
return true;
}
}
@@ -247,7 +247,7 @@ public class FilePendingMessageCursor ex
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
- cacheEnabled = true;
+ setCacheEnabled(true);
return;
}
}
@@ -428,7 +428,7 @@ public class FilePendingMessageCursor ex
}
memoryList.clear();
- cacheEnabled = false;
+ setCacheEnabled(false);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Feb 22 18:51:54 2011
@@ -187,15 +187,15 @@ public class StoreDurableSubscriberCurso
Destination dest = msg.getRegionDestination();
TopicStorePrefetch tsp = topics.get(dest);
if (tsp != null) {
- // cache can be come high priority cache for immediate dispatch
+ // cache can become high priority cache for immediate dispatch
final int priority = msg.getPriority();
- if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.cacheEnabled) {
+ if (isStarted() && this.prioritizedMessages && immediatePriorityDispatch && !tsp.isCacheEnabled()) {
if (priority > tsp.getCurrentLowestPriority()) {
if (LOG.isTraceEnabled()) {
LOG.trace("enabling cache for cursor on high priority message " + priority
+ ", current lowest: " + tsp.getCurrentLowestPriority());
}
- tsp.cacheEnabled = true;
+ tsp.setCacheEnabled(true);
cacheCurrentLowestPriority = tsp.getCurrentLowestPriority();
}
} else if (cacheCurrentLowestPriority != UNKNOWN && priority <= cacheCurrentLowestPriority) {
@@ -206,7 +206,7 @@ public class StoreDurableSubscriberCurso
+ priority + ", tsp current lowest: " + tsp.getCurrentLowestPriority()
+ " cache lowest: " + cacheCurrentLowestPriority);
}
- tsp.cacheEnabled = false;
+ tsp.setCacheEnabled(false);
cacheCurrentLowestPriority = UNKNOWN;
}
tsp.addMessageLast(node);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Tue Feb 22 18:51:54 2011
@@ -297,7 +297,7 @@ public class StoreQueueCursor extends Ab
@Override
public boolean isCacheEnabled() {
- cacheEnabled = isUseCache();
+ boolean cacheEnabled = isUseCache();
if (cacheEnabled) {
if (persistent != null) {
cacheEnabled &= persistent.isCacheEnabled();
@@ -305,6 +305,7 @@ public class StoreQueueCursor extends Ab
if (nonPersistent != null) {
cacheEnabled &= nonPersistent.isCacheEnabled();
}
+ setCacheEnabled(cacheEnabled);
}
return cacheEnabled;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Tue Feb 22 18:51:54 2011
@@ -132,6 +132,6 @@ class TopicStorePrefetch extends Abstrac
@Override
public String toString() {
- return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
+ return "TopicStorePrefetch(" + clientId + "," + subscriberName + ")" + super.toString();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Tue Feb 22 18:51:54 2011
@@ -40,6 +40,25 @@ import org.slf4j.LoggerFactory;
*/
public class JDBCMessageStore extends AbstractMessageStore {
+ class Duration {
+ static final int LIMIT = 100;
+ final long start = System.currentTimeMillis();
+ final String name;
+
+ Duration(String name) {
+ this.name = name;
+ }
+ void end() {
+ end(null);
+ }
+ void end(Object o) {
+ long duration = System.currentTimeMillis() - start;
+
+ if (duration > LIMIT) {
+ System.err.println(name + " took a long time: " + duration + "ms " + o);
+ }
+ }
+ }
private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
protected final WireFormat wireFormat;
protected final JDBCAdapter adapter;
@@ -58,7 +77,6 @@ public class JDBCMessageStore extends Ab
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
-
MessageId messageId = message.getMessageId();
if (audit != null && audit.isDuplicate(message)) {
if (LOG.isDebugEnabled()) {
@@ -90,6 +108,10 @@ public class JDBCMessageStore extends Ab
} finally {
c.close();
}
+ onAdd(sequenceId, message.getPriority());
+ }
+
+ protected void onAdd(long sequenceId, byte priority) {
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Tue Feb 22 18:51:54 2011
@@ -18,10 +18,10 @@ package org.apache.activemq.store.jdbc;
import java.io.IOException;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.ConnectionContext;
@@ -102,22 +102,120 @@ public class JDBCTopicMessageStore exten
}
}
- private class LastRecovered {
- long sequence = 0;
- byte priority = 9;
-
- public void update(long sequence, Message msg) {
- this.sequence = sequence;
- this.priority = msg.getPriority();
+ private class LastRecovered implements Iterable<LastRecoveredEntry> {
+ LastRecoveredEntry[] perPriority = new LastRecoveredEntry[10];
+ LastRecovered() {
+ for (int i=0; i<perPriority.length; i++) {
+ perPriority[i] = new LastRecoveredEntry(i);
+ }
+ }
+
+ public void updateStored(long sequence, int priority) {
+ perPriority[priority].stored = sequence;
+ }
+
+ public LastRecoveredEntry defaultPriority() {
+ return perPriority[javax.jms.Message.DEFAULT_PRIORITY];
+ }
+
+ public String toString() {
+ return Arrays.deepToString(perPriority);
+ }
+
+ public Iterator<LastRecoveredEntry> iterator() {
+ return new PriorityIterator();
+ }
+
+ class PriorityIterator implements Iterator<LastRecoveredEntry> {
+ int current = 9;
+ public boolean hasNext() {
+ for (int i=current; i>=0; i--) {
+ if (perPriority[i].hasMessages()) {
+ current = i;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public LastRecoveredEntry next() {
+ return perPriority[current];
+ }
+
+ public void remove() {
+ throw new RuntimeException("not implemented");
+ }
+ }
+ }
+
+ private class LastRecoveredEntry {
+ final int priority;
+ long recovered = 0;
+ long stored = Integer.MAX_VALUE;
+
+ public LastRecoveredEntry(int priority) {
+ this.priority = priority;
}
public String toString() {
- return "" + sequence + ":" + priority;
+ return priority + "-" + stored + ":" + recovered;
+ }
+
+ public void exhausted() {
+ stored = recovered;
+ }
+
+ public boolean hasMessages() {
+ return stored > recovered;
+ }
+ }
+
+ class LastRecoveredAwareListener implements JDBCMessageRecoveryListener {
+ final MessageRecoveryListener delegate;
+ final int maxMessages;
+ LastRecoveredEntry lastRecovered;
+ int recoveredCount;
+ int recoveredMarker;
+
+ public LastRecoveredAwareListener(MessageRecoveryListener delegate, int maxMessages) {
+ this.delegate = delegate;
+ this.maxMessages = maxMessages;
+ }
+
+ public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+ if (delegate.hasSpace()) {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ if (delegate.recoverMessage(msg)) {
+ lastRecovered.recovered = sequenceId;
+ recoveredCount++;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean recoverMessageReference(String reference) throws Exception {
+ return delegate.recoverMessageReference(new MessageId(reference));
+ }
+
+ public void setLastRecovered(LastRecoveredEntry lastRecovered) {
+ this.lastRecovered = lastRecovered;
+ recoveredMarker = recoveredCount;
+ }
+
+ public boolean complete() {
+ return !delegate.hasSpace() || recoveredCount == maxMessages;
+ }
+
+ public boolean stalled() {
+ return recoveredMarker == recoveredCount;
}
}
public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
throws Exception {
+ //Duration duration = new Duration("recoverNextMessages");
TransactionContext c = persistenceAdapter.getTransactionContext();
String key = getSubscriptionKey(clientId, subscriptionName);
@@ -125,38 +223,38 @@ public class JDBCTopicMessageStore exten
subscriberLastRecoveredMap.put(key, new LastRecovered());
}
final LastRecovered lastRecovered = subscriberLastRecoveredMap.get(key);
+ LastRecoveredAwareListener recoveredAwareListener = new LastRecoveredAwareListener(listener, maxReturned);
try {
- JDBCMessageRecoveryListener jdbcListener = new JDBCMessageRecoveryListener() {
- public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
- if (listener.hasSpace()) {
- Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- if (listener.recoverMessage(msg)) {
- lastRecovered.update(sequenceId, msg);
- return true;
- }
- }
- return false;
- }
-
- public boolean recoverMessageReference(String reference) throws Exception {
- return listener.recoverMessageReference(new MessageId(reference));
- }
-
- };
if (LOG.isTraceEnabled()) {
LOG.trace(key + " existing last recovered: " + lastRecovered);
}
if (isPrioritizedMessages()) {
- adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
- lastRecovered.sequence, lastRecovered.priority, maxReturned, jdbcListener);
+ Iterator<LastRecoveredEntry> it = lastRecovered.iterator();
+ for ( ; it.hasNext() && !recoveredAwareListener.complete(); ) {
+ LastRecoveredEntry entry = it.next();
+ recoveredAwareListener.setLastRecovered(entry);
+ //Duration microDuration = new Duration("recoverNextMessages:loop");
+ adapter.doRecoverNextMessagesWithPriority(c, destination, clientId, subscriptionName,
+ entry.recovered, entry.priority, maxReturned, recoveredAwareListener);
+ //microDuration.end(entry);
+ if (recoveredAwareListener.stalled()) {
+ if (recoveredAwareListener.complete()) {
+ break;
+ } else {
+ entry.exhausted();
+ }
+ }
+ }
} else {
+ LastRecoveredEntry last = lastRecovered.defaultPriority();
+ recoveredAwareListener.setLastRecovered(last);
adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName,
- lastRecovered.sequence, 0, maxReturned, jdbcListener);
+ last.recovered, 0, maxReturned, recoveredAwareListener);
}
if (LOG.isTraceEnabled()) {
LOG.trace(key + " last recovered: " + lastRecovered);
}
+ //duration.end();
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
} finally {
@@ -168,6 +266,14 @@ public class JDBCTopicMessageStore exten
subscriberLastRecoveredMap.remove(getSubscriptionKey(clientId, subscriptionName));
}
+ protected void onAdd(long sequenceId, byte priority) {
+ // update last recovered state
+ for (LastRecovered last : subscriberLastRecoveredMap.values()) {
+ last.updateStored(sequenceId, priority);
+ }
+ }
+
+
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -223,6 +329,7 @@ public class JDBCTopicMessageStore exten
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {
+ //Duration duration = new Duration("getMessageCount");
int result = 0;
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -236,6 +343,7 @@ public class JDBCTopicMessageStore exten
if (LOG.isTraceEnabled()) {
LOG.trace(clientId + ":" + subscriberName + ", messageCount: " + result);
}
+ //duration.end();
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java Tue Feb 22 18:51:54 2011
@@ -281,13 +281,13 @@ public class Statements {
public String getFindDurableSubMessagesByPriorityStatement() {
if (findDurableSubMessagesByPriorityStatement == null) {
- findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M, "
- + getFullAckTableName() + " D "
+ findDurableSubMessagesByPriorityStatement = "SELECT M.ID, M.MSG FROM " + getFullMessageTableName() + " M,"
+ + " " + getFullAckTableName() + " D"
+ " WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+ " AND M.CONTAINER=D.CONTAINER"
+ " AND M.PRIORITY=D.PRIORITY AND M.ID > D.LAST_ACKED_ID"
- + " AND ( (M.ID > ?) OR (M.PRIORITY < ?) )"
- + " ORDER BY M.PRIORITY DESC, M.ID";
+ + " AND M.ID > ? AND M.PRIORITY = ?"
+ + " ORDER BY M.ID";
}
return findDurableSubMessagesByPriorityStatement;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java?rev=1073453&r1=1073452&r2=1073453&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java Tue Feb 22 18:51:54 2011
@@ -23,15 +23,19 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -43,32 +47,28 @@ import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
+//import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.Wait;
+//import org.apache.commons.dbcp.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConcurrentProducerDurableConsumerTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentProducerDurableConsumerTest.class);
- private int consumerCount = 1;
+ private int consumerCount = 5;
BrokerService broker;
protected List<Connection> connections = Collections.synchronizedList(new ArrayList<Connection>());
- protected Map<MessageConsumer, MessageIdList> consumers = new HashMap<MessageConsumer, MessageIdList>();
+ protected Map<MessageConsumer, TimedMessageListener> consumers = new HashMap<MessageConsumer, TimedMessageListener>();
protected MessageIdList allMessagesList = new MessageIdList();
private int messageSize = 1024;
- public void testPlaceHolder() throws Exception {
- }
-
- public void x_initCombosForTestSendRateWithActivatingConsumers() throws Exception {
+ public void initCombosForTestSendRateWithActivatingConsumers() throws Exception {
addCombinationValues("defaultPersistenceAdapter",
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
}
- public void x_testSendRateWithActivatingConsumers() throws Exception {
+ public void testSendRateWithActivatingConsumers() throws Exception {
final Destination destination = createDestination();
final ConnectionFactory factory = createConnectionFactory();
startInactiveConsumers(factory, destination);
@@ -78,12 +78,12 @@ public class ConcurrentProducerDurableCo
MessageProducer producer = createMessageProducer(session, destination);
// preload the durable consumers
- double[] inactiveConsumerStats = produceMessages(destination, 200, 100, session, producer, null);
+ double[] inactiveConsumerStats = produceMessages(destination, 500, 10, session, producer, null);
LOG.info("With inactive consumers: ave: " + inactiveConsumerStats[1]
+ ", max: " + inactiveConsumerStats[0] + ", multiplier: " + (inactiveConsumerStats[0]/inactiveConsumerStats[1]));
- // periodically start a durable sub that is has a backlog
- final int consumersToActivate = 1;
+ // periodically start a durable sub that has a backlog
+ final int consumersToActivate = 5;
final Object addConsumerSignal = new Object();
Executors.newCachedThreadPool(new ThreadFactory() {
@Override
@@ -96,16 +96,15 @@ public class ConcurrentProducerDurableCo
try {
MessageConsumer consumer = null;
for (int i = 0; i < consumersToActivate; i++) {
- LOG.info("Waiting for add signal");
+ LOG.info("Waiting for add signal from producer...");
synchronized (addConsumerSignal) {
addConsumerSignal.wait(30 * 60 * 1000);
}
+ TimedMessageListener listener = new TimedMessageListener();
consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1));
LOG.info("Created consumer " + consumer);
- MessageIdList list = new MessageIdList();
- list.setParent(allMessagesList);
- consumer.setMessageListener(list);
- consumers.put(consumer, list);
+ consumer.setMessageListener(listener);
+ consumers.put(consumer, listener);
}
} catch (Exception e) {
LOG.error("failed to start consumer", e);
@@ -114,18 +113,44 @@ public class ConcurrentProducerDurableCo
});
- double[] stats = produceMessages(destination, 20, 100, session, producer, addConsumerSignal);
+ double[] statsWithActive = produceMessages(destination, 300, 10, session, producer, addConsumerSignal);
- LOG.info(" with concurrent activate, ave: " + stats[1] + ", max: " + stats[0] + ", multiplier: " + (stats[0]/stats[1]));
- assertTrue("max (" + stats[0] + ") within reasonable multiplier of ave (" + stats[1] + ")",
- stats[0] < 5 * stats[1]);
+ LOG.info(" with concurrent activate, ave: " + statsWithActive[1] + ", max: " + statsWithActive[0] + ", multiplier: " + (statsWithActive[0]/ statsWithActive[1]));
+
+ while(consumers.size() < consumersToActivate) {
+ TimeUnit.SECONDS.sleep(2);
+ }
+ long timeToFirstAccumulator = 0;
+ for (TimedMessageListener listener : consumers.values()) {
+ long time = listener.getFirstReceipt();
+ timeToFirstAccumulator += time;
+ LOG.info("Time to first " + time);
+ }
+ LOG.info("Ave time to first message =" + timeToFirstAccumulator/consumers.size());
+
+ for (TimedMessageListener listener : consumers.values()) {
+ LOG.info("Ave batch receipt time: " + listener.waitForReceivedLimit(5000) + " max receipt: " + listener.maxReceiptTime);
+ }
+
+ //assertTrue("max (" + statsWithActive[0] + ") within reasonable multiplier of ave (" + statsWithActive[1] + ")",
+ // statsWithActive[0] < 5 * statsWithActive[1]);
+
+ // compare no active to active
+ LOG.info("Ave send time with active: " + statsWithActive[1]
+ + " as multiplier of ave with none active: " + inactiveConsumerStats[1]
+ + ", multiplier=" + (statsWithActive[1]/inactiveConsumerStats[1]));
+
+ assertTrue("Ave send time with active: " + statsWithActive[1]
+ + " within reasonable multpler of ave with none active: " + inactiveConsumerStats[1]
+ + ", multiplier " + (statsWithActive[1]/inactiveConsumerStats[1]),
+ statsWithActive[1] < 15 * inactiveConsumerStats[1]);
}
public void x_initCombosForTestSendWithInactiveAndActiveConsumers() throws Exception {
addCombinationValues("defaultPersistenceAdapter",
- new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC, PersistenceAdapterChoice.MEM});
+ new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
}
public void x_testSendWithInactiveAndActiveConsumers() throws Exception {
@@ -150,7 +175,7 @@ public class ConcurrentProducerDurableCo
LOG.info("With consumer: " + withConsumerStats[1] + " , with noConsumer: " + noConsumerStats[1]
+ ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]));
- final int reasonableMultiplier = 4; // not so reasonable, but on slow disks it can be
+ final int reasonableMultiplier = 15; // not so reasonable but improving
assertTrue("max X times as slow with consumer: " + withConsumerStats[1] + ", with no Consumer: "
+ noConsumerStats[1] + ", multiplier: " + (withConsumerStats[1]/noConsumerStats[1]),
withConsumerStats[1] < noConsumerStats[1] * reasonableMultiplier);
@@ -188,9 +213,8 @@ public class ConcurrentProducerDurableCo
protected void startConsumers(ConnectionFactory factory, Destination dest) throws Exception {
MessageConsumer consumer;
for (int i = 0; i < consumerCount; i++) {
+ TimedMessageListener list = new TimedMessageListener();
consumer = createDurableSubscriber(factory.createConnection(), dest, "consumer" + (i + 1));
- MessageIdList list = new MessageIdList();
- list.setParent(allMessagesList);
consumer.setMessageListener(list);
consumers.put(consumer, list);
}
@@ -212,39 +236,44 @@ public class ConcurrentProducerDurableCo
* @throws Exception
*/
private double[] produceMessages(Destination destination,
- int toSend,
- int numIterations,
+ final int toSend,
+ final int numIterations,
Session session,
MessageProducer producer,
Object addConsumerSignal) throws Exception {
long start;
long count = 0;
- double max = 0, sum = 0;
+ double batchMax = 0, max = 0, sum = 0;
for (int i=0; i<numIterations; i++) {
start = System.currentTimeMillis();
for (int j=0; j < toSend; j++) {
+ long singleSendstart = System.currentTimeMillis();
TextMessage msg = createTextMessage(session, "" + j);
producer.send(msg);
- if (++count % 300 == 0) {
+ max = Math.max(max, (System.currentTimeMillis() - singleSendstart));
+ if (++count % 500 == 0) {
if (addConsumerSignal != null) {
synchronized (addConsumerSignal) {
addConsumerSignal.notifyAll();
- LOG.info("Signaled add consumer");
+ LOG.info("Signalled add consumer");
}
}
}
+ ;
if (count % 5000 == 0) {
- LOG.info("Sent " + count);
+ LOG.info("Sent " + count + ", singleSendMax:" + max);
}
}
long duration = System.currentTimeMillis() - start;
- max = Math.max(max, duration);
+ batchMax = Math.max(batchMax, duration);
sum += duration;
+ LOG.info("Iteration " + i + ", sent " + toSend + ", time: "
+ + duration + ", batchMax:" + batchMax + ", singleSendMax:" + max);
}
- LOG.info("Sent: " + toSend * numIterations + ", max send time: " + max);
- return new double[]{max, sum/numIterations};
+ LOG.info("Sent: " + toSend * numIterations + ", batchMax: " + batchMax + " singleSendMax: " + max);
+ return new double[]{batchMax, sum/numIterations};
}
protected TextMessage createTextMessage(Session session, String initText) throws Exception {
@@ -297,12 +326,44 @@ public class ConcurrentProducerDurableCo
PolicyEntry policy = new PolicyEntry();
policy.setPrioritizedMessages(true);
+ policy.setMaxPageSize(500);
+
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(policyMap);
- //setPersistenceAdapter(brokerService, PersistenceAdapterChoice.JDBC);
- setDefaultPersistenceAdapter(brokerService);
+ if (false) {
+ // external mysql works a lot faster
+ //
+// JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+// BasicDataSource ds = new BasicDataSource();
+// com.mysql.jdbc.Driver d = new com.mysql.jdbc.Driver();
+// ds.setDriverClassName("com.mysql.jdbc.Driver");
+// ds.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true");
+// ds.setMaxActive(200);
+// ds.setUsername("root");
+// ds.setPassword("");
+// ds.setPoolPreparedStatements(true);
+// jdbc.setDataSource(ds);
+// brokerService.setPersistenceAdapter(jdbc);
+
+/* add mysql bits to the pom in the testing dependencies
+<dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.10</version>
+ <scope>test</scope>
+</dependency>
+<dependency>
+ <groupId>commons-dbcp</groupId>
+ <artifactId>commons-dbcp</artifactId>
+ <version>1.2.2</version>
+ <scope>test</scope>
+</dependency>
+ */
+ } else {
+ setDefaultPersistenceAdapter(brokerService);
+ }
return brokerService;
}
@@ -311,6 +372,8 @@ public class ConcurrentProducerDurableCo
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(1);
factory.setPrefetchPolicy(prefetchPolicy);
+
+ factory.setDispatchAsync(true);
return factory;
}
@@ -318,4 +381,55 @@ public class ConcurrentProducerDurableCo
return suite(ConcurrentProducerDurableConsumerTest.class);
}
+ class TimedMessageListener implements MessageListener {
+ final int batchSize = 1000;
+ CountDownLatch firstReceiptLatch = new CountDownLatch(1);
+ long mark = System.currentTimeMillis();
+ long firstReceipt = 0l;
+ long receiptAccumulator = 0;
+ long batchReceiptAccumulator = 0;
+ long maxReceiptTime = 0;
+ AtomicLong count = new AtomicLong(0);
+
+ @Override
+ public void onMessage(Message message) {
+ final long current = System.currentTimeMillis();
+ final long duration = current - mark;
+ receiptAccumulator += duration;
+ allMessagesList.onMessage(message);
+ if (count.incrementAndGet() == 1) {
+ firstReceipt = duration;
+ firstReceiptLatch.countDown();
+ LOG.info("First receipt in " + firstReceipt + "ms");
+ } else if (count.get() % batchSize == 0) {
+ LOG.info("Consumed " + batchSize + " in " + batchReceiptAccumulator + "ms");
+ batchReceiptAccumulator=0;
+ }
+ maxReceiptTime = Math.max(maxReceiptTime, duration);
+ receiptAccumulator += duration;
+ batchReceiptAccumulator += duration;
+ mark = current;
+ }
+
+ long getMessageCount() {
+ return count.get();
+ }
+
+ long getFirstReceipt() throws Exception {
+ firstReceiptLatch.await(30, TimeUnit.SECONDS);
+ return firstReceipt;
+ }
+
+ public long waitForReceivedLimit(long limit) throws Exception {
+ final long expiry = System.currentTimeMillis() + 30*60*1000;
+ while (count.get() < limit) {
+ if (System.currentTimeMillis() > expiry) {
+ throw new RuntimeException("Expired waiting for X messages, " + limit);
+ }
+ TimeUnit.SECONDS.sleep(2);
+ }
+ return receiptAccumulator/(limit/batchSize);
+ }
+ }
+
}