You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/11/24 20:57:52 UTC
svn commit: r478967 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/jmx/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/store/ main/...
Author: rajdavies
Date: Fri Nov 24 11:57:51 2006
New Revision: 478967
URL: http://svn.apache.org/viewvc?view=rev&rev=478967
Log:
implementation of store based cursors for Queues and Durable Subscribers,
to fix:
http://issues.apache.org/activemq/browse/AMQ-845
http://issues.apache.org/activemq/browse/AMQ-1062
http://issues.apache.org/activemq/browse/AMQ-1061
http://issues.apache.org/activemq/browse/AMQ-493
http://issues.apache.org/activemq/browse/AMQ-914
Added:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Nov 24 11:57:51 2006
@@ -427,6 +427,10 @@
public void recoverMessageReference(String messageReference) throws Exception{}
public void finished(){}
+
+ public boolean hasSpace(){
+ return true;
+ }
});
}catch(Throwable e){
log.error("Failed to browse messages for Subscription "+view,e);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Nov 24 11:57:51 2006
@@ -117,6 +117,7 @@
public void initialize() throws Exception{
if(store!=null){
// Restore the persistent messages.
+ messages.setUsageManager(getUsageManager());
messages.start();
if(messages.isRecoveryRequired()){
store.recover(new MessageRecoveryListener(){
@@ -145,6 +146,10 @@
public void finished(){
}
+
+ public boolean hasSpace(){
+ return true;
+ }
});
}
}
@@ -242,6 +247,9 @@
synchronized (consumers) {
consumers.remove(sub);
+ if (consumers.isEmpty()) {
+ messages.gc();
+ }
}
sub.remove(context, this);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Nov 24 11:57:51 2006
@@ -203,6 +203,10 @@
}
public void finished(){}
+
+ public boolean hasSpace(){
+ return true;
+ }
});
}
@@ -334,6 +338,10 @@
public void recoverMessageReference(String messageReference) throws Exception{}
public void finished(){}
+
+ public boolean hasSpace(){
+ return true;
+ }
});
Message[] msgs=subscriptionRecoveryPolicy.browse(getActiveMQDestination());
if(msgs!=null){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Nov 24 11:57:51 2006
@@ -219,6 +219,7 @@
PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
info.getPrefetchSize());
+ cursor.setUsageManager(memoryManager);
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
durableSubscriptions.put(key,sub);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Nov 24 11:57:51 2006
@@ -14,10 +14,10 @@
package org.apache.activemq.broker.region.cursors;
-import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.memory.UsageManager;
/**
* Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor
@@ -27,11 +27,13 @@
public class AbstractPendingMessageCursor implements PendingMessageCursor{
protected int maxBatchSize=100;
+ protected UsageManager usageManager;
public void start() throws Exception{
}
public void stop() throws Exception{
+ gc();
}
public void add(ConnectionContext context,Destination destination) throws Exception{
@@ -86,17 +88,22 @@
protected void fillBatch() throws Exception{
}
- /**
- * Give the cursor a hint that we are about to remove messages from memory only
- */
public void resetForGC(){
reset();
}
- /**
- * @param node
- * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
- */
public void remove(MessageReference node){
+ }
+
+ public void gc(){
+ }
+
+
+ public void setUsageManager(UsageManager usageManager){
+ this.usageManager = usageManager;
+ }
+
+ public boolean hasSpace() {
+ return usageManager != null ? !usageManager.isFull() : true;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Nov 24 11:57:51 2006
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.memory.UsageManager;
/**
* Interface to pending message (messages awaiting disptach to a consumer) cursor
@@ -125,4 +126,18 @@
* @param node
*/
public void remove(MessageReference node);
+
+
+ /**
+ * free up any internal buffers
+ *
+ */
+ public void gc();
+
+ /**
+ * Set the UsageManager
+ * @param usageManager
+ * @see org.apache.activemq.memory.UsageManager
+ */
+ public void setUsageManager(UsageManager usageManager);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Fri Nov 24 11:57:51 2006
@@ -20,16 +20,12 @@
import java.io.IOException;
import java.util.LinkedList;
-import javax.jms.JMSException;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
-import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
-import org.apache.activemq.store.TopicMessageStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,6 +61,7 @@
public void stop() throws Exception{
store.resetBatching();
+ gc();
}
/**
@@ -124,6 +121,10 @@
throws Exception{
// shouldn't get called
throw new RuntimeException("Not supported");
+ }
+
+ public void gc() {
+ batchList.clear();
}
// implementation
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Nov 24 11:57:51 2006
@@ -25,6 +25,7 @@
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -86,6 +87,7 @@
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
TopicStorePrefetch tsp=new TopicStorePrefetch((Topic)destination,clientId,subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
+ tsp.setUsageManager(usageManager);
topics.put(destination,tsp);
storePrefetches.add(tsp);
if(started){
@@ -200,6 +202,21 @@
tsp.setMaxBatchSize(maxBatchSize);
}
super.setMaxBatchSize(maxBatchSize);
+ }
+
+ public synchronized void gc() {
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ PendingMessageCursor tsp=(PendingMessageCursor)i.next();
+ tsp.gc();
+ }
+ }
+
+ public synchronized void setUsageManager(UsageManager usageManager){
+ super.setUsageManager(usageManager);
+ for(Iterator i=storePrefetches.iterator();i.hasNext();){
+ PendingMessageCursor tsp=(PendingMessageCursor)i.next();
+ tsp.setUsageManager(usageManager);
+ }
}
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Fri Nov 24 11:57:51 2006
@@ -14,11 +14,11 @@
package org.apache.activemq.broker.region.cursors;
-import java.util.Iterator;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,6 +55,7 @@
if(nonPersistent==null){
nonPersistent=new FilePendingMessageCursor(queue.getDestination(),tmpStore);
nonPersistent.setMaxBatchSize(getMaxBatchSize());
+ nonPersistent.setUsageManager(usageManager);
}
nonPersistent.start();
persistent.start();
@@ -65,8 +66,10 @@
started=false;
if(nonPersistent!=null){
nonPersistent.stop();
+ nonPersistent.gc();
}
persistent.stop();
+ persistent.gc();
pendingCount=0;
}
@@ -162,10 +165,29 @@
}
super.setMaxBatchSize(maxBatchSize);
}
+
+ public void gc() {
+ if (persistent != null) {
+ persistent.gc();
+ }
+ if (nonPersistent != null) {
+ nonPersistent.gc();
+ }
+ }
+
+ public void setUsageManager(UsageManager usageManager){
+ super.setUsageManager(usageManager);
+ if (persistent != null) {
+ persistent.setUsageManager(usageManager);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.setUsageManager(usageManager);
+ }
+ }
protected synchronized PendingMessageCursor getNextCursor() throws Exception{
if(currentCursor==null||currentCursor.isEmpty()){
- currentCursor = currentCursor == persistent ? nonPersistent : persistent;
+ currentCursor=currentCursor==persistent?nonPersistent:persistent;
}
return currentCursor;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Fri Nov 24 11:57:51 2006
@@ -66,6 +66,7 @@
public void stop() throws Exception{
store.resetBatching(clientId,subscriberName);
+ gc();
}
/**
@@ -136,6 +137,10 @@
Message message=(Message)batchList.getLast();
}
+ }
+
+ public void gc() {
+ batchList.clear();
}
public String toString() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/MessageRecoveryListener.java Fri Nov 24 11:57:51 2006
@@ -26,4 +26,5 @@
void recoverMessage(Message message) throws Exception;
void recoverMessageReference(String messageReference) throws Exception;
void finished();
+ boolean hasSpace();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Fri Nov 24 11:57:51 2006
@@ -235,14 +235,18 @@
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
- Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
- lastMessageId.set(sequenceId);
+ if(listener.hasSpace()){
+ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ lastMessageId.set(sequenceId);
+ }
}
public void recoverMessageReference(String reference) throws Exception{
- listener.recoverMessageReference(reference);
+ if(listener.hasSpace()) {
+ listener.recoverMessageReference(reference);
+ }
}
public void finished(){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -108,10 +108,12 @@
new JDBCMessageRecoveryListener(){
public void recoverMessage(long sequenceId,byte[] data) throws Exception{
- Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
- msg.getMessageId().setBrokerSequenceId(sequenceId);
- listener.recoverMessage(msg);
- finalLast.set(sequenceId);
+ if(listener.hasSpace()){
+ Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
+ msg.getMessageId().setBrokerSequenceId(sequenceId);
+ listener.recoverMessage(msg);
+ finalLast.set(sequenceId);
+ }
}
public void recoverMessageReference(String reference) throws Exception{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Nov 24 11:57:51 2006
@@ -370,6 +370,7 @@
ResultSet rs=null;
try{
s=c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
+ s.setMaxRows(maxReturned);
s.setString(1,destination.getQualifiedName());
s.setString(2,clientId);
s.setString(3,subscriptionName);
@@ -639,8 +640,9 @@
ResultSet rs=null;
try{
s=c.getConnection().prepareStatement(statements.getFindNextMessagesStatement());
+ s.setMaxRows(maxReturned);
s.setString(1,destination.getQualifiedName());
- s.setLong(4,nextSeq);
+ s.setLong(2,nextSeq);
rs=s.executeQuery();
int count=0;
if(statements.isUseExternalMessageReferences()){
@@ -654,7 +656,9 @@
count++;
}
}
- }finally{
+ }catch(Exception e) {
+ e.printStackTrace();
+ }finally {
close(rs);
close(s);
listener.finished();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java Fri Nov 24 11:57:51 2006
@@ -373,6 +373,10 @@
public void finished(){
listener.finished();
}
+ public boolean hasSpace(){
+ // TODO Auto-generated method stub
+ return true;
+ }
});
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -68,6 +68,9 @@
public void finished(){
listener.finished();
}
+ public boolean hasSpace(){
+ return true;
+ }
});
}
@@ -86,6 +89,9 @@
public void finished(){
listener.finished();
+ }
+ public boolean hasSpace(){
+ return true;
}
});
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Fri Nov 24 11:57:51 2006
@@ -210,7 +210,7 @@
}
batchEntry = entry;
entry=messageContainer.getNext(entry);
- }while(entry!=null&&count<maxReturned);
+ }while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -184,7 +184,7 @@
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
- }while(entry!=null&&count<maxReturned);
+ }while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
listener.finished();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidMessageStore.java Fri Nov 24 11:57:51 2006
@@ -350,7 +350,7 @@
}
batchEntry=entry;
entry=messageContainer.getNext(entry);
- }while(entry!=null&&count<maxReturned);
+ }while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Fri Nov 24 11:57:51 2006
@@ -186,7 +186,7 @@
}
container.setBatchEntry(entry);
entry=container.getListContainer().getNext(entry);
- }while(entry!=null&&count<maxReturned);
+ }while(entry!=null&&count<maxReturned && listener.hasSpace());
}
}
listener.finished();
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java Fri Nov 24 11:57:51 2006
@@ -28,7 +28,7 @@
public void testManyProducersManyConsumers() throws Exception {
consumerCount = 20;
producerCount = 20;
- messageCount = 500;
+ messageCount = 50;
messageSize = 1;
prefetchCount = 10;
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=478967&r1=478966&r2=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Fri Nov 24 11:57:51 2006
@@ -1,220 +1,56 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region.cursors;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
import javax.jms.Topic;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
-import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
/**
* @version $Revision: 1.3 $
*/
-public class CursorDurableTest extends TestCase{
-
- protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
-
- protected static final int MESSAGE_COUNT=100;
- protected static final int PREFETCH_SIZE = 5;
- protected BrokerService broker;
- protected String bindAddress="tcp://localhost:60706";
- protected int topicCount=0;
-
- public void testSendFirstThenConsume() throws Exception{
- ConnectionFactory factory=createConnectionFactory();
- Connection consumerConnection= getConsumerConnection(factory);
- //create durable subs
- MessageConsumer consumer = getConsumer(consumerConnection);
- consumerConnection.close();
-
- Connection producerConnection = factory.createConnection();
- producerConnection.start();
- Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(getTopic(session));
- List senderList = new ArrayList();
- for (int i =0; i < MESSAGE_COUNT; i++) {
- Message msg=session.createTextMessage("test"+i);
- senderList.add(msg);
- producer.send(msg);
- }
- producerConnection.close();
-
- //now consume the messages
- consumerConnection= getConsumerConnection(factory);
- //create durable subs
- consumer = getConsumer(consumerConnection);
- List consumerList = new ArrayList();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message msg = consumer.receive();
- consumerList.add(msg);
- }
- assertEquals(senderList,consumerList);
- consumerConnection.close();
- }
-
- public void testSendWhilstConsume() throws Exception{
- ConnectionFactory factory=createConnectionFactory();
- Connection consumerConnection= getConsumerConnection(factory);
- //create durable subs
- MessageConsumer consumer = getConsumer(consumerConnection);
- consumerConnection.close();
-
- Connection producerConnection = factory.createConnection();
- producerConnection.start();
- Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(getTopic(session));
- List senderList = new ArrayList();
- for (int i =0; i < MESSAGE_COUNT/10; i++) {
- TextMessage msg=session.createTextMessage("test"+i);
- senderList.add(msg);
- producer.send(msg);
- }
-
-
- //now consume the messages
- consumerConnection= getConsumerConnection(factory);
- //create durable subs
- consumer = getConsumer(consumerConnection);
- final List consumerList = new ArrayList();
-
- final CountDownLatch latch = new CountDownLatch(1);
- consumer.setMessageListener(new MessageListener() {
-
- public void onMessage(Message msg){
- try{
- //sleep to act as a slow consumer
- //which will force a mix of direct and polled dispatching
- //using the cursor on the broker
- Thread.sleep(50);
- }catch(Exception e){
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- consumerList.add(msg);
- if (consumerList.size()==MESSAGE_COUNT) {
- latch.countDown();
- }
-
- }
-
- });
- for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
- TextMessage msg=session.createTextMessage("test"+i);
- senderList.add(msg);
-
- producer.send(msg);
-
-
- }
-
-
- latch.await(300000,TimeUnit.MILLISECONDS);
- assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0);
- assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size());
- assertEquals(senderList,consumerList);
- producerConnection.close();
- consumerConnection.close();
- }
-
-
+public class CursorDurableTest extends CursorSupport{
- protected Topic getTopic(Session session) throws JMSException{
+ protected Destination getDestination(Session session) throws JMSException{
String topicName=getClass().getName();
return session.createTopic(topicName);
}
-
+
protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
Connection connection=fac.createConnection();
connection.setClientID("testConsumer");
connection.start();
return connection;
-
}
-
+
protected MessageConsumer getConsumer(Connection connection) throws Exception{
- Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- Topic topic = getTopic(consumerSession);
- MessageConsumer consumer = consumerSession.createDurableSubscriber(topic,"testConsumer");
+ Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic topic=(Topic)getDestination(consumerSession);
+ MessageConsumer consumer=consumerSession.createDurableSubscriber(topic,"testConsumer");
return consumer;
}
-
-
-
- protected void setUp() throws Exception{
- if(broker==null){
- broker=createBroker();
- }
- super.setUp();
- }
-
- protected void tearDown() throws Exception{
- super.tearDown();
-
- if(broker!=null){
- broker.stop();
- }
- }
-
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
- ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
- Properties props = new Properties();
- props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE);
- props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE);
- cf.setProperties(props);
- return cf;
- }
-
-
- protected BrokerService createBroker() throws Exception{
- BrokerService answer=new BrokerService();
- configureBroker(answer);
+ protected void configureBroker(BrokerService answer) throws Exception{
answer.setDeleteAllMessagesOnStartup(true);
answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
- answer.start();
- return answer;
- }
-
- protected void configureBroker(BrokerService answer) throws Exception{
-
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public class CursorQueueStoreTest extends CursorSupport{
+
+ protected Destination getDestination(Session session) throws JMSException{
+ String queueName="QUEUE" + getClass().getName();
+ return session.createQueue(queueName);
+ }
+
+ protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+ Connection connection=fac.createConnection();
+ connection.setClientID("testConsumer");
+ connection.start();
+ return connection;
+ }
+
+ protected MessageConsumer getConsumer(Connection connection) throws Exception{
+ Session consumerSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Destination dest = getDestination(consumerSession);
+ MessageConsumer consumer=consumerSession.createConsumer(dest);
+ return consumer;
+ }
+
+
+ protected void configureBroker(BrokerService answer) throws Exception{
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ answer.setDestinationPolicy(pMap);
+ answer.setDeleteAllMessagesOnStartup(true);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,175 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision: 1.3 $
+ */
+public abstract class CursorSupport extends TestCase{
+
+ protected static final Log log=LogFactory.getLog(CursorSupport.class);
+ protected static final int MESSAGE_COUNT=500;
+ protected static final int PREFETCH_SIZE=50;
+ protected BrokerService broker;
+ protected String bindAddress="tcp://localhost:60706";
+
+ protected abstract Destination getDestination(Session session) throws JMSException;
+
+ protected abstract MessageConsumer getConsumer(Connection connection) throws Exception;
+
+ protected abstract void configureBroker(BrokerService answer) throws Exception;
+
+ public void testSendFirstThenConsume() throws Exception{
+ ConnectionFactory factory=createConnectionFactory();
+ Connection consumerConnection=getConsumerConnection(factory);
+ MessageConsumer consumer=getConsumer(consumerConnection);
+ consumerConnection.close();
+ Connection producerConnection=factory.createConnection();
+ producerConnection.start();
+ Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer=session.createProducer(getDestination(session));
+ List senderList=new ArrayList();
+ for(int i=0;i<MESSAGE_COUNT;i++){
+ Message msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+ producerConnection.close();
+ // now consume the messages
+ consumerConnection=getConsumerConnection(factory);
+ // create durable subs
+ consumer=getConsumer(consumerConnection);
+ List consumerList=new ArrayList();
+ for(int i=0;i<MESSAGE_COUNT;i++){
+ Message msg=consumer.receive();
+ consumerList.add(msg);
+ }
+ assertEquals(senderList,consumerList);
+ consumerConnection.close();
+ }
+
+ public void testSendWhilstConsume() throws Exception{
+ ConnectionFactory factory=createConnectionFactory();
+ Connection consumerConnection=getConsumerConnection(factory);
+ // create durable subs
+ MessageConsumer consumer=getConsumer(consumerConnection);
+ consumerConnection.close();
+ Connection producerConnection=factory.createConnection();
+ producerConnection.start();
+ Session session=producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer=session.createProducer(getDestination(session));
+ List senderList=new ArrayList();
+ for(int i=0;i<MESSAGE_COUNT/10;i++){
+ TextMessage msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+ // now consume the messages
+ consumerConnection=getConsumerConnection(factory);
+ // create durable subs
+ consumer=getConsumer(consumerConnection);
+ final List consumerList=new ArrayList();
+ final CountDownLatch latch=new CountDownLatch(1);
+ consumer.setMessageListener(new MessageListener(){
+
+ public void onMessage(Message msg){
+ try{
+ // sleep to act as a slow consumer
+ // which will force a mix of direct and polled dispatching
+ // using the cursor on the broker
+ Thread.sleep(50);
+ }catch(Exception e){
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ consumerList.add(msg);
+ if(consumerList.size()==MESSAGE_COUNT){
+ latch.countDown();
+ }
+ }
+ });
+ for(int i=MESSAGE_COUNT/10;i<MESSAGE_COUNT;i++){
+ TextMessage msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+ latch.await(300000,TimeUnit.MILLISECONDS);
+ assertEquals("Still dipatching - count down latch not sprung",latch.getCount(),0);
+ assertEquals("cosumerList - expected: "+MESSAGE_COUNT+" but was: "+consumerList.size(),consumerList.size(),
+ senderList.size());
+ assertEquals(senderList,consumerList);
+ producerConnection.close();
+ consumerConnection.close();
+ }
+
+ protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+ Connection connection=fac.createConnection();
+ connection.setClientID("testConsumer");
+ connection.start();
+ return connection;
+ }
+
+ protected void setUp() throws Exception{
+ if(broker==null){
+ broker=createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ if(broker!=null){
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+ ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
+ Properties props=new Properties();
+ props.setProperty("prefetchPolicy.durableTopicPrefetch",""+PREFETCH_SIZE);
+ props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch",""+PREFETCH_SIZE);
+ props.setProperty("prefetchPolicy.queuePrefetch",""+PREFETCH_SIZE);
+ cf.setProperties(props);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception{
+ BrokerService answer=new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=auto&rev=478967
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java Fri Nov 24 11:57:51 2006
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StorePendingQueueMessageStoragePolicy;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class KahaQueueStoreTest extends CursorQueueStoreTest{
+
+ protected static final Log log = LogFactory.getLog(KahaQueueStoreTest.class);
+
+
+
+ protected void configureBroker(BrokerService answer) throws Exception{
+ KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
+ answer.setPersistenceAdapter(adaptor);
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+ PolicyMap pMap = new PolicyMap();
+ pMap.setDefaultEntry(policy);
+ answer.setDestinationPolicy(pMap);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native