You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/13 13:17:46 UTC
svn commit: r463646 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/broker/region/po...
Author: rajdavies
Date: Fri Oct 13 04:17:41 2006
New Revision: 463646
URL: http://svn.apache.org/viewvc?view=rev&rev=463646
Log:
Fixed failing test cases: - a few problems had been there for a while
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Fri Oct 13 04:17:41 2006
@@ -21,6 +21,7 @@
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -250,10 +251,19 @@
/**
* Sets the default administration connection context used when configuring the broker on startup or via JMX
+ * @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
-
+ /**
+ * @return the pendingDurableSubscriberPolicy
+ */
+ public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
+
+ /**
+ * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+ */
+ public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
/**
* @return the broker's temp data store
* @throws Exception
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -232,6 +233,15 @@
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
next.setAdminConnectionContext(adminConnectionContext);
}
+
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+ return next.getPendingDurableSubscriberPolicy();
+ }
+
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
+ next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
+ }
+
public Store getTempDataStore() {
return next.getTempDataStore();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Oct 13 04:17:41 2006
@@ -45,7 +45,9 @@
import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
@@ -137,6 +139,7 @@
private ActiveMQDestination[] destinations;
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
+ private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
/**
@@ -388,7 +391,13 @@
}
getBroker().start();
-
+ /*
+ if(isUseJmx()){
+ // yes - this is orer dependent!
+ // register all destination in persistence store including inactive destinations as mbeans
+ this.startDestinationsInPersistenceStore(broker);
+ }
+ */
startAllConnectors();
if (isUseJmx() && masterConnector != null) {
@@ -987,6 +996,23 @@
public void setPersistenceThreadPriority(int persistenceThreadPriority){
this.persistenceThreadPriority=persistenceThreadPriority;
}
+
+ /**
+ * @return the pendingDurableSubscriberPolicy
+ */
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+ return this.pendingDurableSubscriberPolicy;
+ }
+
+ /**
+ * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+ */
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
+ this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
+ if (broker != null) {
+ broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
+ }
+ }
// Implementation methods
// -------------------------------------------------------------------------
@@ -1199,8 +1225,6 @@
mbeanServer.registerMBean(adminView, objectName);
registeredMBeanNames.add(objectName);
}
- //register all destination in persistence store including inactive destinations as mbeans
- this.startDestinationsInPersistenceStore(broker);
}
@@ -1243,6 +1267,7 @@
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
+ regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
return regionBroker;
}
@@ -1515,8 +1540,5 @@
broker.addDestination(adminConnectionContext, destination);
}
}
- }
-
-
-
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -229,6 +230,13 @@
public Response messagePull(ConnectionContext context, MessagePull pull) {
return null;
+ }
+
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+ return null;
+ }
+
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
}
public Store getTempDataStore() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Fri Oct 13 04:17:41 2006
@@ -23,6 +23,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -228,6 +229,14 @@
}
public Response messagePull(ConnectionContext context, MessagePull pull) {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
throw new BrokerStoppedException(this.message);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Fri Oct 13 04:17:41 2006
@@ -19,6 +19,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -243,6 +244,14 @@
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return getNext().messagePull(context, pull);
+ }
+
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+ return getNext().getPendingDurableSubscriberPolicy();
+ }
+
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
+ getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
public Store getTempDataStore() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Oct 13 04:17:41 2006
@@ -177,7 +177,6 @@
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Adding consumer: "+info.getConsumerId());
-
ActiveMQDestination destination = info.getDestination();
if (destination != null && ! destination.isPattern() && ! destination.isComposite()) {
// lets auto-create the destination
@@ -260,7 +259,6 @@
}
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
-
log.debug("Removing consumer: "+info.getConsumerId());
Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Oct 13 04:17:41 2006
@@ -78,7 +78,7 @@
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
-
+
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
@@ -92,6 +92,7 @@
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
+ queue.initialize();
return queue;
}
} else if (destination.isTemporary()){
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Oct 13 04:17:41 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
@@ -40,10 +41,8 @@
private final boolean keepDurableSubsActive;
private boolean active=false;
- public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
- //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
- //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
- super(broker,context,info);
+ public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
+ super(broker,context,info,cursor);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
@@ -192,7 +191,6 @@
* Release any references that we are holding.
*/
synchronized public void destroy() {
-
synchronized(pending) {
pending.reset();
while(pending.hasNext()) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Oct 13 04:17:41 2006
@@ -124,8 +124,8 @@
synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++;
- //if(!isFull()){
- if(!isFull() && pending.isEmpty() && canDispatch(node)){
+
+ if(!isFull() && pending.isEmpty() ){
dispatch(node);
}else{
optimizePrefetch();
@@ -376,7 +376,6 @@
if(canDispatch(node)&&!isSlaveBroker()){
MessageDispatch md=createMessageDispatch(node,message);
-
// NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) {
dispatchCounter++;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Oct 13 04:17:41 2006
@@ -20,12 +20,15 @@
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
+import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -67,10 +70,10 @@
protected final ActiveMQDestination destination;
protected final List consumers = new CopyOnWriteArrayList();
- private final LinkedList messages = new LinkedList();
protected final Valve dispatchValve = new Valve(true);
protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ protected PendingMessageCursor messages = new VMPendingMessageCursor();
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
@@ -100,6 +103,10 @@
destinationStatistics.setParent(parentStats);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
+
+ }
+
+ public void initialize() throws Exception {
if (store != null) {
// Restore the persistent messages.
store.recover(new MessageRecoveryListener() {
@@ -107,7 +114,11 @@
message.setRegionDestination(Queue.this);
MessageReference reference = createMessageReference(message);
synchronized (messages) {
- messages.add(reference);
+ try{
+ messages.addMessageLast(reference);
+ }catch(Exception e){
+ log.fatal("Failed to add message to cursor",e);
+ }
}
reference.decrementReferenceCount();
destinationStatistics.getMessages().increment();
@@ -158,9 +169,10 @@
synchronized (messages) {
// Add all the matching messages in the queue to the
// subscription.
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ messages.reset();
+ while(messages.hasNext()) {
- QueueMessageReference node = (QueueMessageReference) iter.next();
+ QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) {
continue;
}
@@ -219,8 +231,9 @@
// lets copy the messages to dispatch to avoid deadlock
List messagesToDispatch = new ArrayList();
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- QueueMessageReference node = (QueueMessageReference) iter.next();
+ messages.reset();
+ while(messages.hasNext()) {
+ QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) {
continue;
}
@@ -314,12 +327,13 @@
public void gc() {
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ messages.resetForGC();
+ while(messages.hasNext()) {
// Remove dropped messages from the queue.
- QueueMessageReference node = (QueueMessageReference) iter.next();
+ QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) {
garbageSize--;
- iter.remove();
+ messages.remove();
continue;
}
}
@@ -456,6 +470,12 @@
public void setMemoryLimit(long limit) {
getUsageManager().setLimit(limit);
}
+ public PendingMessageCursor getMessages(){
+ return this.messages;
+ }
+ public void setMessages(PendingMessageCursor messages){
+ this.messages=messages;
+ }
// Implementation methods
// -------------------------------------------------------------------------
@@ -470,7 +490,7 @@
try {
destinationStatistics.onMessageEnqueue(message);
synchronized (messages) {
- messages.add(node);
+ messages.addMessageLast(node);
}
synchronized (consumers) {
@@ -509,12 +529,12 @@
}
public Message[] browse() {
-
ArrayList l = new ArrayList();
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ messages.reset();
+ while(messages.hasNext()) {
try {
- MessageReference r = (MessageReference) iter.next();
+ MessageReference r = (MessageReference) messages.next();
r.incrementReferenceCount();
try {
Message m = r.getMessage();
@@ -536,9 +556,10 @@
public Message getMessage(String messageId) {
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ messages.reset();
+ while(messages.hasNext()) {
try {
- MessageReference r = (MessageReference) iter.next();
+ MessageReference r = (MessageReference) messages.next();
if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount();
try {
@@ -563,9 +584,10 @@
public void purge() {
synchronized (messages) {
ConnectionContext c = createConnectionContext();
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
+ messages.reset();
+ while(messages.hasNext()) {
try {
- QueueMessageReference r = (QueueMessageReference) iter.next();
+ QueueMessageReference r = (QueueMessageReference) messages.next();
// We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@@ -623,8 +645,9 @@
int counter = 0;
synchronized (messages) {
ConnectionContext c = createConnectionContext();
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- IndirectMessageReference r = (IndirectMessageReference) iter.next();
+ messages.reset();
+ while(messages.hasNext()) {
+ IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(c, r)) {
// We should only delete messages that can be locked.
if (lockMessage(r)) {
@@ -672,8 +695,9 @@
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int counter = 0;
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- MessageReference r = (MessageReference) iter.next();
+ messages.reset();
+ while(messages.hasNext()) {
+ MessageReference r = (MessageReference) messages.next();
if (filter.evaluate(context, r)) {
r.incrementReferenceCount();
try {
@@ -721,8 +745,9 @@
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int counter = 0;
synchronized (messages) {
- for (Iterator iter = messages.iterator(); iter.hasNext();) {
- IndirectMessageReference r = (IndirectMessageReference) iter.next();
+ messages.reset();
+ while(messages.hasNext()) {
+ IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
if (lockMessage(r)) {
@@ -789,5 +814,4 @@
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
return answer;
}
-
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Oct 13 04:17:41 2006
@@ -79,7 +79,6 @@
String groupId = node.getGroupID();
int sequence = node.getGroupSequence();
if( groupId!=null ) {
-
MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
// If we can own the first, then no-one else should own the rest.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Oct 13 04:17:41 2006
@@ -31,7 +31,9 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -93,6 +95,7 @@
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
+ private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
@@ -583,5 +586,19 @@
public Store getTempDataStore() {
return brokerService.getTempDataStore();
+ }
+
+ /**
+ * @return the pendingDurableSubscriberPolicy
+ */
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+ return this.pendingDurableSubscriberPolicy;
+ }
+
+ /**
+ * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
+ */
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
+ this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Fri Oct 13 04:17:41 2006
@@ -26,6 +26,7 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId;
@@ -61,60 +62,52 @@
}
- public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
- if (info.isDurable()) {
-
- ActiveMQDestination destination = info.getDestination();
- if( !destination.isPattern() ) {
+ public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
+ if(info.isDurable()){
+ ActiveMQDestination destination=info.getDestination();
+ if(!destination.isPattern()){
// Make sure the destination is created.
- lookup(context, destination);
+ lookup(context,destination);
}
-
- String clientId = context.getClientId();
- String subcriptionName = info.getSubcriptionName();
- SubscriptionKey key = new SubscriptionKey(clientId, subcriptionName);
- DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- if (sub != null) {
-
- if (sub.isActive()) {
- throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subcriptionName);
+ String clientId=context.getClientId();
+ String subcriptionName=info.getSubcriptionName();
+ SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName);
+ DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+ if(sub!=null){
+ if(sub.isActive()){
+ throw new JMSException("Durable consumer is in use for client: "+clientId+" and subscriptionName: "
+ +subcriptionName);
}
-
// Has the selector changed??
- if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
-
+ if(hasDurableSubChanged(info,sub.getConsumerInfo())){
// Remove the consumer first then add it.
durableSubscriptions.remove(key);
- for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
- Topic topic = (Topic) iter.next();
- topic.deleteSubscription(context, key);
+ for(Iterator iter=destinations.values().iterator();iter.hasNext();){
+ Topic topic=(Topic)iter.next();
+ topic.deleteSubscription(context,key);
}
- super.removeConsumer(context, sub.getConsumerInfo());
-
- super.addConsumer(context, info);
- sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- }
- else {
+ super.removeConsumer(context,sub.getConsumerInfo());
+ super.addConsumer(context,info);
+ sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+ }else{
// Change the consumer id key of the durable sub.
- if( sub.getConsumerInfo().getConsumerId()!=null )
+ if(sub.getConsumerInfo().getConsumerId()!=null)
subscriptions.remove(sub.getConsumerInfo().getConsumerId());
- subscriptions.put(info.getConsumerId(), sub);
+ subscriptions.put(info.getConsumerId(),sub);
}
- }
- else {
- super.addConsumer(context, info);
- sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- if (sub == null) {
- throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: "
- + key.getClientId() + " subscriberName: " + key.getSubscriptionName());
+ }else{
+ super.addConsumer(context,info);
+ sub=(DurableTopicSubscription)durableSubscriptions.get(key);
+ if(sub==null){
+ throw new JMSException("Cannot use the same consumerId: "+info.getConsumerId()
+ +" for two different durable subscriptions clientID: "+key.getClientId()
+ +" subscriberName: "+key.getSubscriptionName());
}
}
-
- sub.activate(context, info);
+ sub.activate(context,info);
return sub;
- }
- else {
- return super.addConsumer(context, info);
+ }else{
+ return super.addConsumer(context,info);
}
}
@@ -222,9 +215,12 @@
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
- if (sub == null) {
- sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive);
- durableSubscriptions.put(key, sub);
+ if(sub==null){
+ PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
+ context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),
+ info.getPrefetchSize());
+ sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
+ durableSubscriptions.put(key,sub);
}
else {
throw new JMSException("That durable subscription is already active.");
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -1,19 +1,15 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.cursors;
@@ -24,13 +20,13 @@
import org.apache.activemq.broker.region.MessageReference;
/**
- * Abstract method holder for pending message (messages awaiting disptach to a
- * consumer) cursor
+ * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor
*
* @version $Revision$
*/
-public class AbstractPendingMessageCursor implements PendingMessageCursor {
- protected int maxBatchSize = 100;
+public class AbstractPendingMessageCursor implements PendingMessageCursor{
+
+ protected int maxBatchSize=100;
public void start() throws Exception{
}
@@ -38,12 +34,10 @@
public void stop() throws Exception{
}
- public void add(ConnectionContext context,Destination destination)
- throws Exception{
+ public void add(ConnectionContext context,Destination destination) throws Exception{
}
- public void remove(ConnectionContext context,Destination destination)
- throws Exception{
+ public void remove(ConnectionContext context,Destination destination) throws Exception{
}
public boolean isRecoveryRequired(){
@@ -80,7 +74,7 @@
public int size(){
return 0;
}
-
+
public int getMaxBatchSize(){
return maxBatchSize;
}
@@ -91,6 +85,11 @@
protected void fillBatch() throws Exception{
}
-
-
+
+ /**
+ * Give the cursor a hint that we are about to remove messages from memory only
+ */
+ public void resetForGC(){
+ reset();
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -24,7 +24,7 @@
import org.apache.activemq.store.kahadaptor.CommandMarshaller;
/**
* perist pending messages pending message (messages awaiting disptach to a consumer) cursor
- *
+ *
* @version $Revision$
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor{
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Fri Oct 13 04:17:41 2006
@@ -113,4 +113,10 @@
* @param maxBatchSize
*/
public void setMaxBatchSize(int maxBatchSize);
+
+ /**
+ * Give the cursor a hint that we are about to remove
+ * messages from memory only
+ */
+ public void resetForGC();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Fri Oct 13 04:17:41 2006
@@ -127,12 +127,7 @@
return false;
}
- public synchronized void addMessageFirst(MessageReference node) throws IOException{
- if(started){
- throw new RuntimeException("This shouldn't be called!");
- }
- }
-
+
public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){
Message msg=node.getMessage();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Oct 13 04:17:41 2006
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
@@ -46,6 +47,7 @@
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory;
+ private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
public void configure(Queue queue) {
if (dispatchPolicy != null) {
@@ -58,6 +60,10 @@
if( memoryLimit>0 ) {
queue.getUsageManager().setLimit(memoryLimit);
}
+ if (pendingQueueMessageStoragePolicy != null) {
+ PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor();
+ queue.setMessages(messages);
+ }
}
public void configure(Topic topic) {
@@ -74,6 +80,7 @@
if( memoryLimit>0 ) {
topic.getUsageManager().setLimit(memoryLimit);
}
+
}
public void configure(TopicSubscription subscription) {
@@ -193,6 +200,22 @@
*/
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
this.messageGroupMapFactory = messageGroupMapFactory;
+ }
+
+
+ /**
+ * @return the pendingQueueMessageStoragePolicy
+ */
+ public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
+ return this.pendingQueueMessageStoragePolicy;
+ }
+
+
+ /**
+ * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
+ */
+ public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
+ this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Fri Oct 13 04:17:41 2006
@@ -20,6 +20,7 @@
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -218,6 +219,13 @@
}
public void stop() throws Exception {
+ }
+
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
+ return null;
+ }
+
+ public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
}
public Store getTempDataStore() {
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Fri Oct 13 04:17:41 2006
@@ -39,6 +39,7 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,7 +52,7 @@
protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
- protected static final int MESSAGE_COUNT=50;
+ protected static final int MESSAGE_COUNT=100;
protected static final int PREFETCH_SIZE = 5;
protected BrokerService broker;
protected String bindAddress="tcp://localhost:60706";
@@ -138,7 +139,10 @@
for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
TextMessage msg=session.createTextMessage("test"+i);
senderList.add(msg);
+
producer.send(msg);
+
+
}
@@ -204,11 +208,13 @@
BrokerService answer=new BrokerService();
configureBroker(answer);
answer.setDeleteAllMessagesOnStartup(true);
+ answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer) throws Exception{
+
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java?view=diff&rev=463646&r1=463645&r2=463646
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java Fri Oct 13 04:17:41 2006
@@ -35,6 +35,6 @@
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress);
- answer.setDeleteAllMessagesOnStartup(true);
+ //answer.setDeleteAllMessagesOnStartup(true);
}
}