You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/01/28 20:39:04 UTC
svn commit: r500862 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/broker/region/po...
Author: rajdavies
Date: Sun Jan 28 11:39:02 2007
New Revision: 500862
URL: http://svn.apache.org/viewvc?view=rev&rev=500862
Log:
Updated support for configurable Cursor types from the Destination Policy Map
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (with props)
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Sun Jan 28 11:39:02 2007
@@ -254,20 +254,6 @@
* @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
-
- /**
- * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
- */
- public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
- /**
- * @return the broker's temp data store
- * @throws Exception
- */
-
+
public Store getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sun Jan 28 11:39:02 2007
@@ -234,15 +234,6 @@
next.setAdminConnectionContext(adminConnectionContext);
}
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
- return next.getPendingDurableSubscriberPolicy();
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
- next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
-
-
public Store getTempDataStore() {
return next.getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sun Jan 28 11:39:02 2007
@@ -55,7 +55,6 @@
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
@@ -149,8 +148,6 @@
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false;
- //private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
- private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new StorePendingDurableSubscriberMessageStoragePolicy();
@@ -1008,24 +1005,7 @@
public void setPersistenceThreadPriority(int persistenceThreadPriority){
this.persistenceThreadPriority=persistenceThreadPriority;
}
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
- return this.pendingDurableSubscriberPolicy;
- }
-
- /**
- * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
- */
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
- this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
- if (broker != null) {
- broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
- }
-
+
/**
* @return the useLocalHostBrokerName
*/
@@ -1296,7 +1276,6 @@
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
- regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
return regionBroker;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sun Jan 28 11:39:02 2007
@@ -232,12 +232,6 @@
return null;
}
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
- return null;
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
- }
public Store getTempDataStore() {
return null;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sun Jan 28 11:39:02 2007
@@ -231,15 +231,7 @@
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
-
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
- throw new BrokerStoppedException(this.message);
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
- throw new BrokerStoppedException(this.message);
- }
-
+
public Store getTempDataStore() {
throw new BrokerStoppedException(this.message);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sun Jan 28 11:39:02 2007
@@ -246,14 +246,6 @@
return getNext().messagePull(context, pull);
}
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
- return getNext().getPendingDurableSubscriberPolicy();
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
- getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
- }
-
public Store getTempDataStore() {
return getNext().getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Sun Jan 28 11:39:02 2007
@@ -24,10 +24,12 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,8 +42,8 @@
private final boolean keepDurableSubsActive;
private boolean active=false;
- public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
- super(broker,context,info,cursor);
+ public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
+ super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
@@ -70,7 +72,7 @@
dispatchMatched();
}
- public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
+ public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Deactivating " + this);
if( !active ) {
this.active = true;
@@ -83,6 +85,7 @@
}
}
synchronized(pending) {
+ pending.setUsageManager(memoryManager);
pending.start();
}
//If nothing was in the persistent store, then try to use the recovery policy.
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sun Jan 28 11:39:02 2007
@@ -49,7 +49,7 @@
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
- final protected PendingMessageCursor pending;
+ protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
protected long enqueueCounter;
@@ -342,6 +342,17 @@
public boolean isRecoveryRequired(){
return pending.isRecoveryRequired();
}
+
+
+ public PendingMessageCursor getPending(){
+ return this.pending;
+ }
+
+ public void setPending(PendingMessageCursor pending){
+ this.pending=pending;
+ }
+
+
/**
* optimize message consumer prefetch if the consumer supports it
@@ -506,4 +517,7 @@
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{
}
+
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Sun Jan 28 11:39:02 2007
@@ -22,10 +22,9 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
@@ -58,8 +57,6 @@
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sun Jan 28 11:39:02 2007
@@ -95,7 +95,7 @@
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
- private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
+
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
@@ -587,16 +587,5 @@
public Store getTempDataStore() {
return brokerService.getTempDataStore();
- }
-
- /**
- * @return the pendingDurableSubscriberPolicy
- */
- public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
- return this.pendingDurableSubscriberPolicy;
- }
-
- public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
- this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Sun Jan 28 11:39:02 2007
@@ -1,51 +1,70 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region;
import javax.jms.JMSException;
-
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
*
* @version $Revision: 1.7 $
*/
-public class TempTopicRegion extends AbstractRegion {
+public class TempTopicRegion extends AbstractRegion{
- public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
- super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
+ private static final Log log=LogFactory.getLog(TempTopicRegion.class);
+
+ public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager,
+ TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){
+ super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
setAutoCreateDestinations(false);
}
- protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
- if( info.isDurable() ) {
+ protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
+ if(info.isDurable()){
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
- } else {
- return new TopicSubscription(broker,context, info, this.memoryManager);
}
- }
-
- public String toString() {
- return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
+ try{
+ TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
+ // lets configure the subscription depending on the destination
+ ActiveMQDestination destination=info.getDestination();
+ if(destination!=null&&broker.getDestinationPolicy()!=null){
+ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+ if(entry!=null){
+ entry.configure(broker,memoryManager,answer);
+ }
+ }
+ answer.init();
+ return answer;
+ }catch(Exception e){
+ log.error("Failed to create TopicSubscription ",e);
+ JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
}
-
+ public String toString(){
+ return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="
+ +memoryManager.getPercentUsage()+"%";
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java Sun Jan 28 11:39:02 2007
@@ -104,7 +104,7 @@
+" subscriberName: "+key.getSubscriptionName());
}
}
- sub.activate(context,info);
+ sub.activate(memoryManager,context,info);
return sub;
}else{
return super.addConsumer(context,info);
@@ -208,38 +208,45 @@
}
}
- protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
- if (info.isDurable()) {
- if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
+ protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
+ if(info.isDurable()){
+ if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
- SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
- DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
+ SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
+ DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
if(sub==null){
- PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
- context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
- info.getPrefetchSize());
- cursor.setUsageManager(memoryManager);
- sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
+ sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive);
+ ActiveMQDestination destination=info.getDestination();
+ if(destination!=null&&broker.getDestinationPolicy()!=null){
+ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+ if(entry!=null){
+ entry.configure(broker,memoryManager,sub);
+ }
+ }
durableSubscriptions.put(key,sub);
- }
- else {
+ }else{
throw new JMSException("That durable subscription is already active.");
}
return sub;
}
- else {
- TopicSubscription answer = new TopicSubscription(broker,context, info, memoryManager);
-
+ try{
+ TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
// lets configure the subscription depending on the destination
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && broker.getDestinationPolicy() != null) {
- PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
- if (entry != null) {
- entry.configure(answer);
+ ActiveMQDestination destination=info.getDestination();
+ if(destination!=null&&broker.getDestinationPolicy()!=null){
+ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+ if(entry!=null){
+ entry.configure(broker,memoryManager,answer);
}
}
+ answer.init();
return answer;
+ }catch(Exception e){
+ log.error("Failed to create TopicSubscription ",e);
+ JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Sun Jan 28 11:39:02 2007
@@ -22,6 +22,7 @@
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
@@ -41,7 +42,7 @@
private static final Log log=LogFactory.getLog(TopicSubscription.class);
private static final AtomicLong cursorNameCounter=new AtomicLong(0);
- final protected FilePendingMessageCursor matched;
+ protected PendingMessageCursor matched;
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
protected AtomicLong delivered=new AtomicLong();
@@ -56,17 +57,21 @@
private int memoryUsageHighWaterMark=95;
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
- throws InvalidSelectorException{
+ throws Exception{
super(broker,context,info);
this.usageManager=usageManager;
String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+"]";
this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
+
+ }
+
+ public void init() throws Exception {
this.matched.setUsageManager(usageManager);
this.matched.start();
}
-
- public void add(MessageReference node) throws InterruptedException,IOException{
+
+ public void add(MessageReference node) throws Exception{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
@@ -309,6 +314,20 @@
public UsageManager getUsageManager(){
return this.usageManager;
}
+
+ /**
+ * @return the matched
+ */
+ public PendingMessageCursor getMatched(){
+ return this.matched;
+ }
+
+ /**
+ * @param matched the matched to set
+ */
+ public void setMatched(PendingMessageCursor matched){
+ this.matched=matched;
+ }
/**
* inform the MessageConsumer on the client to change it's prefetch
@@ -402,7 +421,14 @@
public void destroy(){
synchronized(matchedListMutex){
- matched.destroy();
+ try{
+ matched.destroy();
+ }catch(Exception e){
+ log.warn("Failed to destroy cursor",e);
+ }
}
}
+
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
+import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@@ -149,5 +150,22 @@
*/
public UsageManager getUsageManager(){
return this.usageManager;
+ }
+
+ /**
+ * destroy the cursor
+ * @throws Exception
+ */
+ public void destroy() throws Exception {
+ stop();
+ }
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems) {
+ throw new RuntimeException("Not supported");
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -18,6 +18,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@@ -39,6 +40,7 @@
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
+ static private final AtomicLong nameCount = new AtomicLong();
private Store store;
private String name;
private LinkedList memoryList=new LinkedList();
@@ -54,7 +56,7 @@
* @param store
*/
public FilePendingMessageCursor(String name,Store store){
- this.name=name;
+ this.name=nameCount.incrementAndGet() + "_"+name;
this.store=store;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
+import java.util.LinkedList;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@@ -188,6 +189,19 @@
* @return true if the cursor has buffered messages ready to deliver
*/
public boolean hasMessagesBufferedToDeliver();
+
+ /**
+ * destroy the cursor
+ * @throws Exception
+ */
+ public void destroy() throws Exception;
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Sun Jan 28 11:39:02 2007
@@ -103,4 +103,13 @@
}
}
}
+
+ /**
+ * Page in a restricted number of messages
+ * @param maxItems
+ * @return a list of paged in messages
+ */
+ public LinkedList pageInList(int maxItems) {
+ return list;
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -23,7 +23,7 @@
/**
* Creates a FilePendingMessageCursor
* *
- * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from file"
+ * @org.apache.xbean.XBean element="fileQueueCursor" description="Pending messages paged in from file"
*
* @version $Revision$
*/
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a PendIngMessageCursor for Durable subscribers
+ * *
+ * @org.apache.xbean.XBean element="fileCursor" description="Pending messages for durable subscribers
+ * held in temporary files"
+ *
+ * @version $Revision$
+ */
+public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+
+ /**
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return a Cursor
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+ */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
+ return new FilePendingMessageCursor("PendingCursor:" + name,tmpStorage);
+ }
+}
\ No newline at end of file
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,38 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+* Abstraction to allow different policies for holding messages awaiting dispatch to active clients
+*
+* @version $Revision$
+*/
+public interface PendingSubscriberMessageStoragePolicy{
+
+ /**
+ * Retrieve the configured pending message storage cursor;
+ *
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return the Pending Message cursor
+ */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,
+ int maxBatchSize);
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Sun Jan 28 11:39:02 2007
@@ -1,22 +1,21 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
@@ -25,21 +24,21 @@
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
+import org.apache.activemq.memory.UsageManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- * Represents an entry in a {@link PolicyMap} for assigning policies to a
- * specific destination or a hierarchical wildcard area of destinations.
+ * Represents an entry in a {@link PolicyMap} for assigning policies to a specific destination or a hierarchical
+ * wildcard area of destinations.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.1 $
*/
-public class PolicyEntry extends DestinationMapEntry {
+public class PolicyEntry extends DestinationMapEntry{
- private static final Log log = LogFactory.getLog(PolicyEntry.class);
-
+ private static final Log log=LogFactory.getLog(PolicyEntry.class);
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
@@ -48,135 +47,149 @@
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory;
- private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
-
- public void configure(Queue queue, Store tmpStore) {
- if (dispatchPolicy != null) {
+ private PendingQueueMessageStoragePolicy pendingQueuePolicy;
+ private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
+ private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
+ public void configure(Queue queue,Store tmpStore){
+ if(dispatchPolicy!=null){
queue.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
+ if(deadLetterStrategy!=null){
queue.setDeadLetterStrategy(deadLetterStrategy);
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
- if( memoryLimit>0 ) {
+ if(memoryLimit>0){
queue.getUsageManager().setLimit(memoryLimit);
}
- if (pendingQueueMessageStoragePolicy != null) {
- PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore);
+ if(pendingQueuePolicy!=null){
+ PendingMessageCursor messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore);
queue.setMessages(messages);
}
}
- public void configure(Topic topic) {
- if (dispatchPolicy != null) {
+ public void configure(Topic topic){
+ if(dispatchPolicy!=null){
topic.setDispatchPolicy(dispatchPolicy);
}
- if (deadLetterStrategy != null) {
+ if(deadLetterStrategy!=null){
topic.setDeadLetterStrategy(deadLetterStrategy);
}
- if (subscriptionRecoveryPolicy != null) {
+ if(subscriptionRecoveryPolicy!=null){
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
- if( memoryLimit>0 ) {
+ if(memoryLimit>0){
topic.getUsageManager().setLimit(memoryLimit);
}
-
}
- public void configure(TopicSubscription subscription) {
- if (pendingMessageLimitStrategy != null) {
- int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
- int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
- if (consumerLimit > 0) {
- if (value < 0 || consumerLimit < value) {
- value = consumerLimit;
+ public void configure(Broker broker,UsageManager memoryManager,TopicSubscription subscription){
+ if(pendingMessageLimitStrategy!=null){
+ int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
+ int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit();
+ if(consumerLimit>0){
+ if(value<0||consumerLimit<value){
+ value=consumerLimit;
}
}
- if (value >= 0) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the maximumPendingMessages size to: " + value + " for consumer: " + subscription.getInfo().getConsumerId());
+ if(value>=0){
+ if(log.isDebugEnabled()){
+ log.debug("Setting the maximumPendingMessages size to: "+value+" for consumer: "
+ +subscription.getInfo().getConsumerId());
}
subscription.setMaximumPendingMessages(value);
}
}
- if (messageEvictionStrategy != null) {
+ if(messageEvictionStrategy!=null){
subscription.setMessageEvictionStrategy(messageEvictionStrategy);
}
+ if (pendingSubscriberPolicy!=null) {
+ String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
+ int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
+ subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize));
+ }
+ }
+
+ public void configure(Broker broker,UsageManager memoryManager,DurableTopicSubscription sub){
+ String clientId=sub.getClientId();
+ String subName=sub.getSubscriptionName();
+ int prefetch=sub.getPrefetchSize();
+ if(pendingDurableSubscriberPolicy!=null){
+ PendingMessageCursor cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
+ subName,broker.getTempDataStore(),prefetch);
+ cursor.setUsageManager(memoryManager);
+ sub.setPending(cursor);
+ }
}
// Properties
// -------------------------------------------------------------------------
- public DispatchPolicy getDispatchPolicy() {
+ public DispatchPolicy getDispatchPolicy(){
return dispatchPolicy;
}
- public void setDispatchPolicy(DispatchPolicy policy) {
- this.dispatchPolicy = policy;
+ public void setDispatchPolicy(DispatchPolicy policy){
+ this.dispatchPolicy=policy;
}
- public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
+ public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){
return subscriptionRecoveryPolicy;
}
- public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
- this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+ public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy){
+ this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy;
}
- public boolean isSendAdvisoryIfNoConsumers() {
+ public boolean isSendAdvisoryIfNoConsumers(){
return sendAdvisoryIfNoConsumers;
}
/**
- * Sends an advisory message if a non-persistent message is sent and there
- * are no active consumers
+ * Sends an advisory message if a non-persistent message is sent and there are no active consumers
*/
- public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
- this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
+ public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){
+ this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers;
}
- public DeadLetterStrategy getDeadLetterStrategy() {
+ public DeadLetterStrategy getDeadLetterStrategy(){
return deadLetterStrategy;
}
/**
- * Sets the policy used to determine which dead letter queue destination
- * should be used
+ * Sets the policy used to determine which dead letter queue destination should be used
*/
- public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
- this.deadLetterStrategy = deadLetterStrategy;
+ public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){
+ this.deadLetterStrategy=deadLetterStrategy;
}
- public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
+ public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){
return pendingMessageLimitStrategy;
}
/**
- * Sets the strategy to calculate the maximum number of messages that are
- * allowed to be pending on consumers (in addition to their prefetch sizes).
+ * Sets the strategy to calculate the maximum number of messages that are allowed to be pending on consumers (in
+ * addition to their prefetch sizes).
*
- * Once the limit is reached, non-durable topics can then start discarding
- * old messages. This allows us to keep dispatching messages to slow
- * consumers while not blocking fast consumers and discarding the messages
- * oldest first.
+ * Once the limit is reached, non-durable topics can then start discarding old messages. This allows us to keep
+ * dispatching messages to slow consumers while not blocking fast consumers and discarding the messages oldest
+ * first.
*/
- public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
- this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
+ public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy){
+ this.pendingMessageLimitStrategy=pendingMessageLimitStrategy;
}
- public MessageEvictionStrategy getMessageEvictionStrategy() {
+ public MessageEvictionStrategy getMessageEvictionStrategy(){
return messageEvictionStrategy;
}
/**
- * Sets the eviction strategy used to decide which message to evict when the
- * slow consumer needs to discard messages
+ * Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
*/
- public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
- this.messageEvictionStrategy = messageEvictionStrategy;
+ public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
+ this.messageEvictionStrategy=messageEvictionStrategy;
}
- public long getMemoryLimit() {
+ public long getMemoryLimit(){
return memoryLimit;
}
@@ -184,40 +197,72 @@
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
- public void setMemoryLimit(long memoryLimit) {
- this.memoryLimit = memoryLimit;
+ public void setMemoryLimit(long memoryLimit){
+ this.memoryLimit=memoryLimit;
}
- public MessageGroupMapFactory getMessageGroupMapFactory() {
- if (messageGroupMapFactory == null) {
- messageGroupMapFactory = new MessageGroupHashBucketFactory();
+ public MessageGroupMapFactory getMessageGroupMapFactory(){
+ if(messageGroupMapFactory==null){
+ messageGroupMapFactory=new MessageGroupHashBucketFactory();
}
return messageGroupMapFactory;
}
/**
- * Sets the factory used to create new instances of {MessageGroupMap} used to implement the
- * <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
+ * Sets the factory used to create new instances of {MessageGroupMap} used to implement the <a
+ * href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
+ */
+ public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory){
+ this.messageGroupMapFactory=messageGroupMapFactory;
+ }
+
+
+ /**
+ * @return the pendingDurableSubscriberPolicy
*/
- public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
- this.messageGroupMapFactory = messageGroupMapFactory;
+ public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
+ return this.pendingDurableSubscriberPolicy;
}
/**
- * @return the pendingQueueMessageStoragePolicy
+ * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
- public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
- return this.pendingQueueMessageStoragePolicy;
+ public void setPendingDurableSubscriberPolicy(
+ PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
+ this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
}
/**
- * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
+ * @return the pendingQueuePolicy
*/
- public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
- this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
+ public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){
+ return this.pendingQueuePolicy;
}
+ /**
+ * @param pendingQueuePolicy the pendingQueuePolicy to set
+ */
+ public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy){
+ this.pendingQueuePolicy=pendingQueuePolicy;
+ }
+
+
+ /**
+ * @return the pendingSubscriberPolicy
+ */
+ public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){
+ return this.pendingSubscriberPolicy;
+ }
+
+
+ /**
+ * @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
+ */
+ public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy){
+ this.pendingSubscriberPolicy=pendingSubscriberPolicy;
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -21,7 +21,7 @@
/**
* Creates a VMPendingMessageCursor
* *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * @org.apache.xbean.XBean element="vmDurableCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -22,7 +22,7 @@
/**
* Creates a VMPendingMessageCursor
* *
- * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ * @org.apache.xbean.XBean element="vmQueueCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Sun Jan 28 11:39:02 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.kaha.Store;
+
+
+/**
+ * Creates a VMPendingMessageCursor
+ * *
+ * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
+ *
+ * @version $Revision$
+ */
+public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
+
+ /**
+ * @param name
+ * @param tmpStorage
+ * @param maxBatchSize
+ * @return a Cursor
+ * @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
+ */
+ public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
+ return new VMPendingMessageCursor();
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Sun Jan 28 11:39:02 2007
@@ -22,7 +22,6 @@
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
/**
* @version $Revision: 1.3 $
@@ -50,7 +49,6 @@
protected void configureBroker(BrokerService answer) throws Exception{
answer.setDeleteAllMessagesOnStartup(true);
- answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorQueueStoreTest.java Sun Jan 28 11:39:02 2007
@@ -53,7 +53,7 @@
protected void configureBroker(BrokerService answer) throws Exception{
PolicyEntry policy = new PolicyEntry();
- policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaQueueStoreTest.java Sun Jan 28 11:39:02 2007
@@ -38,7 +38,7 @@
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
answer.setPersistenceAdapter(adaptor);
PolicyEntry policy = new PolicyEntry();
- policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);
Added: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml?view=auto&rev=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml (added)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/broker/policy/cursor.xml Sun Jan 28 11:39:02 2007
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright 2005-2006 The Apache Software Foundation
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans>
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker persistent="false" xmlns="http://activemq.org/config/1.0">
+
+ <!-- lets define the dispatch policy -->
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic="org.apache.>">
+ <dispatchPolicy>
+ <strictOrderDispatchPolicy />
+ </dispatchPolicy>
+ <deadLetterStrategy>
+ <individualDeadLetterStrategy topicPrefix="Test.DLQ." />
+ </deadLetterStrategy>
+ <pendingSubscriberPolicy>
+ <vmCursor />
+ </pendingSubscriberPolicy>
+ </policyEntry>
+
+ <policyEntry queue="org.apache.>">
+ <dispatchPolicy>
+ <strictOrderDispatchPolicy />
+ </dispatchPolicy>
+ <deadLetterStrategy>
+ <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
+ </deadLetterStrategy>
+ <pendingQueuePolicy>
+ <vmQueueCursor />
+ </pendingQueuePolicy>
+ </policyEntry>
+
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->
Modified: incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml?view=diff&rev=500862&r1=500861&r2=500862
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/perf/slowConsumerBroker.xml Sun Jan 28 11:39:02 2007
@@ -18,14 +18,14 @@
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
+ <broker brokerName="slowConsumerBroker" useJmx="false" persistent="false" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
- <policyEntry topic=">">
+ <policyEntry topic="blob">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>