You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/07/20 19:08:14 UTC
svn commit: r558054 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/ test/java/org/apa...
Author: rajdavies
Date: Fri Jul 20 10:08:10 2007
New Revision: 558054
URL: http://svn.apache.org/viewvc?view=rev&rev=558054
Log:
Fix for:
http://issues.apache.org/activemq/browse/AMQ-1207
http://issues.apache.org/activemq/browse/AMQ-880
http://issues.apache.org/activemq/browse/AMQ-450
http://issues.apache.org/activemq/browse/AMQ-879
Added:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Fri Jul 20 10:08:10 2007
@@ -17,6 +17,7 @@
*/
package org.apache.activemq.advisory;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.broker.Broker;
@@ -24,6 +25,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -38,6 +40,8 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap;
@@ -49,7 +53,7 @@
*/
public class AdvisoryBroker extends BrokerFilter {
- //private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
+ private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
protected final ConcurrentHashMap connections = new ConcurrentHashMap();
protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
@@ -226,6 +230,16 @@
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
producers.remove(info.getProducerId());
fireProducerAdvisory(context, topic, info.createRemoveCommand());
+ }
+ }
+
+ public void messageExpired(ConnectionContext context,MessageReference messageReference){
+ next.messageExpired(context,messageReference);
+ try{
+ ActiveMQTopic topic=AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
+ fireAdvisory(context,topic,messageReference.getMessage());
+ }catch(Exception e){
+ log.warn("Failed to fire message expired advisory");
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java Fri Jul 20 10:08:10 2007
@@ -64,6 +64,13 @@
return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName());
}
+ public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
+ if (destination.isQueue()) {
+ return getExpiredQueueMessageAdvisoryTopic(destination);
+ }
+ return getExpiredTopicMessageAdvisoryTopic(destination);
+ }
+
public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
return new ActiveMQTopic(name);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Fri Jul 20 10:08:10 2007
@@ -20,19 +20,15 @@
import java.net.URI;
import java.util.Set;
import org.apache.activemq.Service;
-import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
@@ -135,6 +131,8 @@
/**
* Gets a list of all the prepared xa transactions.
+ * @param context transaction ids
+ * @return
* @throws Exception TODO
*/
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
@@ -151,7 +149,7 @@
* Prepares a transaction. Only valid for xa transactions.
* @param context
* @param xid
- * @return
+ * @return id
* @throws Exception TODO
*/
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
@@ -176,6 +174,9 @@
/**
* Forgets a transaction.
+ * @param context
+ * @param transactionId
+ * @throws Exception
*/
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
@@ -246,7 +247,35 @@
*/
public URI getVmConnectorURI();
+ /**
+ * called when the brokerService starts
+ */
public void brokerServiceStarted();
+ /**
+ * @return the BrokerService
+ */
BrokerService getBrokerService();
+
+ /**
+ * Ensure we get the Broker at the top of the Stack
+ * @return the broker at the top of the Stack
+ */
+ Broker getRoot();
+
+ /**
+ * A Message has Expired
+ * @param context
+ * @param messageReference
+ * @throws Exception
+ */
+ public void messageExpired(ConnectionContext context, MessageReference messageReference);
+
+ /**
+ * A message needs to go the a DLQ
+ * @param context
+ * @param messageReference
+ * @throws Exception
+ */
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
*/
package org.apache.activemq.broker;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
/**
* Allows you to intercept broker operation so that features such as security can be
* implemented as a pluggable filter.
@@ -245,5 +244,17 @@
public BrokerService getBrokerService(){
return next.getBrokerService();
+ }
+
+ public void messageExpired(ConnectionContext context,MessageReference message){
+ next.messageExpired(context,message);
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+ next.sendToDeadLetterQueue(context,messageReference);
+ }
+
+ public Broker getRoot() {
+ return next.getRoot();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,13 @@
*/
package org.apache.activemq.broker;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -38,11 +42,6 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
-import java.net.URI;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
/**
* Dumb implementation - used to be overriden by listeners
*
@@ -245,4 +244,14 @@
public BrokerService getBrokerService(){
return null;
}
+
+ public void messageExpired(ConnectionContext context,MessageReference message){
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+ }
+
+ public Broker getRoot(){
+ return null;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Fri Jul 20 10:08:10 2007
@@ -21,10 +21,9 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
-
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -245,4 +244,16 @@
public BrokerService getBrokerService(){
throw new BrokerStoppedException(this.message);
}
+
+ public void messageExpired(ConnectionContext context,MessageReference message){
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public Broker getRoot(){
+ throw new BrokerStoppedException(this.message);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Fri Jul 20 10:08:10 2007
@@ -17,9 +17,12 @@
*/
package org.apache.activemq.broker;
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -38,10 +41,6 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
-import java.net.URI;
-import java.util.Map;
-import java.util.Set;
-
/**
* Like a BrokerFilter but it allows you to switch the getNext().broker. This has more
* overhead than a BrokerFilter since access to the getNext().broker has to synchronized
@@ -258,6 +257,19 @@
public BrokerService getBrokerService(){
return getNext().getBrokerService();
+ }
+
+
+ public void messageExpired(ConnectionContext context,MessageReference message){
+ getNext().messageExpired(context,message);
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
+ getNext().sendToDeadLetterQueue(context,messageReference);
+ }
+
+ public Broker getRoot(){
+ return getNext().getRoot();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Jul 20 10:08:10 2007
@@ -332,14 +332,15 @@
// Try to auto create the destination... re-invoke broker from the
// top so that the proper security checks are performed.
try {
+
+ context.getBroker().addDestination(context,destination);
dest = addDestination(context, destination);
- //context.getBroker().addDestination(context,destination);
}
catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore this error
}
// We should now have the dest created.
- //dest=(Destination) destinations.get(destination);
+ dest=(Destination) destinations.get(destination);
}
if(dest==null){
throw new JMSException("The destination "+destination+" does not exist.");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Jul 20 10:08:10 2007
@@ -42,118 +42,121 @@
* @author fateev@amazon.com
* @version $Revision$
*/
-public class DestinationFactoryImpl extends DestinationFactory {
+public class DestinationFactoryImpl extends DestinationFactory{
protected final UsageManager memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
- public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
- PersistenceAdapter persistenceAdapter) {
- this.memoryManager = memoryManager;
- this.taskRunnerFactory = taskRunnerFactory;
- if (persistenceAdapter == null) {
+ public DestinationFactoryImpl(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
+ PersistenceAdapter persistenceAdapter){
+ this.memoryManager=memoryManager;
+ this.taskRunnerFactory=taskRunnerFactory;
+ if(persistenceAdapter==null){
throw new IllegalArgumentException("null persistenceAdapter");
}
- this.persistenceAdapter = persistenceAdapter;
+ this.persistenceAdapter=persistenceAdapter;
}
- public void setRegionBroker(RegionBroker broker) {
- if (broker == null) {
+ public void setRegionBroker(RegionBroker broker){
+ if(broker==null){
throw new IllegalArgumentException("null broker");
}
- this.broker = broker;
+ this.broker=broker;
}
- public Set getDestinations() {
+ public Set getDestinations(){
return persistenceAdapter.getDestinations();
}
/**
* @return instance of {@link Queue} or {@link Topic}
*/
- public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
- if (destination.isQueue()) {
- if (destination.isTemporary()) {
- final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
- return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
-
- public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+ public Destination createDestination(ConnectionContext context,ActiveMQDestination destination,
+ DestinationStatistics destinationStatistics) throws Exception{
+ if(destination.isQueue()){
+ if(destination.isTemporary()){
+ final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
+ return new Queue(broker.getRoot(),destination,memoryManager,null,
+ destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
+
+ public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
// Only consumers on the same connection can consume from
// the temporary destination
- if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+ if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
- super.addSubscription(context, sub);
+ super.addSubscription(context,sub);
};
};
- } else {
- MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
- Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
- configureQueue(queue, destination);
+ }else{
+ MessageStore store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
+ Queue queue=new Queue(broker.getRoot(),destination,memoryManager,store,
+ destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
+ configureQueue(queue,destination);
queue.initialize();
return queue;
}
- } else if (destination.isTemporary()){
- final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
- return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) {
- public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
+ }else if(destination.isTemporary()){
+ final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
+ return new Topic(broker.getRoot(),destination,null,memoryManager,
+ destinationStatistics,taskRunnerFactory){
+
+ public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
// Only consumers on the same connection can consume from
// the temporary destination
- if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
+ if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
}
- super.addSubscription(context, sub);
+ super.addSubscription(context,sub);
};
};
- } else {
- TopicMessageStore store = null;
- if (!AdvisorySupport.isAdvisoryTopic(destination)) {
- store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination);
+ }else{
+ TopicMessageStore store=null;
+ if(!AdvisorySupport.isAdvisoryTopic(destination)){
+ store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
}
-
- Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
- configureTopic(topic, destination);
-
+ Topic topic=new Topic(broker.getRoot(),destination,store,memoryManager,
+ destinationStatistics,taskRunnerFactory);
+ configureTopic(topic,destination);
return topic;
}
}
- protected void configureQueue(Queue queue, ActiveMQDestination destination) {
- if (broker == null) {
+ protected void configureQueue(Queue queue,ActiveMQDestination destination){
+ if(broker==null){
throw new IllegalStateException("broker property is not set");
}
- if (broker.getDestinationPolicy() != null) {
- PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
- if (entry != null) {
+ if(broker.getDestinationPolicy()!=null){
+ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+ if(entry!=null){
entry.configure(queue,broker.getTempDataStore());
}
}
}
- protected void configureTopic(Topic topic, ActiveMQDestination destination) {
- if (broker == null) {
+ protected void configureTopic(Topic topic,ActiveMQDestination destination){
+ if(broker==null){
throw new IllegalStateException("broker property is not set");
}
- if (broker.getDestinationPolicy() != null) {
- PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
- if (entry != null) {
+ if(broker.getDestinationPolicy()!=null){
+ PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
+ if(entry!=null){
entry.configure(topic);
}
}
}
- public long getLastMessageBrokerSequenceId() throws IOException {
+ public long getLastMessageBrokerSequenceId() throws IOException{
return persistenceAdapter.getLastMessageBrokerSequenceId();
}
- public PersistenceAdapter getPersistenceAdapter() {
+ public PersistenceAdapter getPersistenceAdapter(){
return persistenceAdapter;
}
- public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
+ public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException{
return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jul 20 10:08:10 2007
@@ -276,17 +276,7 @@
* @throws Exception
*/
protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{
- // Send the message to the DLQ
- Message message=node.getMessage();
- if(message!=null){
- // The original destination and transaction id do not get filled when the message is first
- // sent,
- // it is only populated if the message is routed to another destination like the DLQ
- DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
- ActiveMQDestination deadLetterDestination=deadLetterStrategy
- .getDeadLetterQueueFor(message.getDestination());
- BrokerSupport.resend(context,message,deadLetterDestination);
- }
+ broker.sendToDeadLetterQueue(context,node);
}
/**
@@ -393,7 +383,9 @@
// Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message.
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
- continue; // just drop it.
+ broker.messageExpired(getContext(),node);
+ dequeueCounter++;
+ continue;
}
dispatch(node);
count++;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Jul 20 10:08:10 2007
@@ -27,6 +27,7 @@
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -72,7 +73,6 @@
public class Queue implements Destination, Task {
private final Log log;
-
private final ActiveMQDestination destination;
private final List consumers = new CopyOnWriteArrayList();
private final Valve dispatchValve = new Valve(true);
@@ -96,9 +96,11 @@
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private boolean started = false;
+ final Broker broker;
- public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
+ public Queue(Broker broker,ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
+ this.broker=broker;
this.destination = destination;
this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setUsagePortion(1.0f);
@@ -136,7 +138,8 @@
public void recoverMessage(Message message){
// Message could have expired while it was being loaded..
if(message.isExpired()){
- // TODO remove from store
+ broker.messageExpired(createConnectionContext(),message);
+ destinationStatistics.getMessages().decrement();
return;
}
message.setRegionDestination(Queue.this);
@@ -342,9 +345,8 @@
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if(message.isExpired()){
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
+ broker.messageExpired(context,message);
+ destinationStatistics.getMessages().decrement();
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@@ -365,9 +367,8 @@
// While waiting for space to free up... the message may have expired.
if(message.isExpired()){
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
+ broker.messageExpired(context,message);
+ destinationStatistics.getMessages().decrement();
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@@ -440,10 +441,8 @@
// It could take while before we receive the commit
// op, by that time the message could have expired..
if(message.isExpired()){
- // TODO: remove message from store.
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
+ broker.messageExpired(context,message);
+ destinationStatistics.getMessages().decrement();
return;
}
sendMessage(context,message);
@@ -1011,9 +1010,8 @@
result.add(node);
count++;
}else{
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + node);
- }
+ broker.messageExpired(createConnectionContext(),node);
+ destinationStatistics.getMessages().decrement();
}
}
}finally{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Jul 20 10:08:10 2007
@@ -37,6 +37,7 @@
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@@ -62,6 +63,7 @@
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper;
@@ -625,6 +627,52 @@
public BrokerService getBrokerService(){
return brokerService;
}
-
-
+
+ public void messageExpired(ConnectionContext context,MessageReference node){
+ if(log.isDebugEnabled()){
+ log.debug("Message expired "+node);
+ }
+ getRoot().sendToDeadLetterQueue(context,node);
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference node){
+ try{
+ if(node!=null){
+ Message message=node.getMessage();
+ if(message!=null){
+ DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
+ if(deadLetterStrategy!=null){
+ if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
+ long expiration=message.getExpiration();
+ message.setExpiration(0);
+ message.setProperty("originalExpiration",new Long(expiration));
+ if(!message.isPersistent()){
+ message.setPersistent(true);
+ message.setProperty("originalDeliveryMode","NON_PERSISTENT");
+ }
+ // The original destination and transaction id do not get filled when the message is first
+ // sent,
+ // it is only populated if the message is routed to another destination like the DLQ
+ ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
+ .getDestination());
+ BrokerSupport.resend(context,message,deadLetterDestination);
+ }
+ }
+ }else{
+ log.warn("Null message for node: "+node);
+ }
+ }
+ }catch(Exception e){
+ log.warn("Failed to pass expired message to dead letter queue");
+ }
+ }
+
+ public Broker getRoot(){
+ try{
+ return getBrokerService().getBroker();
+ }catch(Exception e){
+ log.fatal("Trying to get Root Broker "+e);
+ throw new RuntimeException("The broker from the BrokerService should not throw an exception");
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Jul 20 10:08:10 2007
@@ -41,7 +41,7 @@
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
- return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
+ return new Queue(broker.getRoot(),destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Jul 20 10:08:10 2007
@@ -24,6 +24,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@@ -72,10 +73,11 @@
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
+ final Broker broker;
- public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
+ public Topic(Broker broker,ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
-
+ this.broker=broker;
this.destination = destination;
this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager,destination.toString());
@@ -261,9 +263,8 @@
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if( message.isExpired() ) {
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
+ broker.messageExpired(context,message);
+ destinationStatistics.getMessages().decrement();
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@@ -285,9 +286,8 @@
// While waiting for space to free up... the message may have expired.
if(message.isExpired()){
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
+ broker.messageExpired(context,message);
+ destinationStatistics.getMessages().decrement();
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@@ -357,7 +357,9 @@
// It could take while before we receive the commit
// operration.. by that time the message could have expired..
if( message.isExpired() ) {
- // TODO: remove message from store.
+ broker.messageExpired(context,message);
+ message.decrementReferenceCount();
+ destinationStatistics.getMessages().decrement();
return;
}
dispatch(context, message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Fri Jul 20 10:08:10 2007
@@ -103,12 +103,7 @@
int messagesToEvict=oldMessages.length;
for(int i=0;i<messagesToEvict;i++){
MessageReference oldMessage=oldMessages[i];
- oldMessage.decrementReferenceCount();
- matched.remove(oldMessage);
- discarded++;
- if(log.isDebugEnabled()){
- log.debug("Discarding message "+oldMessages[i]);
- }
+ discard(oldMessage);
}
// lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict
@@ -138,6 +133,7 @@
matched.remove();
dispatchedCounter.incrementAndGet();
node.decrementReferenceCount();
+ broker.messageExpired(getContext(),node);
break;
}
}
@@ -367,6 +363,8 @@
// waiting for the consumer to ak the message.
if(message.isExpired()){
message.decrementReferenceCount();
+ broker.messageExpired(getContext(),message);
+ dequeueCounter.incrementAndGet();
continue; // just drop it.
}
dispatch(message);
@@ -409,6 +407,17 @@
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.decrementReferenceCount();
}
+ }
+
+ private void discard(MessageReference message) {
+ message.decrementReferenceCount();
+ matched.remove(message);
+ discarded++;
+ dequeueCounter.incrementAndGet();
+ if(log.isDebugEnabled()){
+ log.debug("Discarding message "+message);
+ }
+ broker.getRoot().sendToDeadLetterQueue(getContext(),message);
}
public String toString(){
Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java?view=auto&rev=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -0,0 +1,74 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.command.Message;
+
+/**
+ * A strategy for choosing which destination is used for dead letter queue messages.
+ *
+ * @version $Revision: 426366 $
+ */
+public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
+ private boolean processNonPersistent=true;
+ private boolean processExpired=true;
+
+ public boolean isSendToDeadLetterQueue(Message message){
+ boolean result=false;
+ if(message!=null){
+ result=true;
+ if(message.isPersistent()==false&&processNonPersistent==false){
+ result=false;
+ }
+ if(message.isExpired()&&processExpired==false){
+ result=false;
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @return the processExpired
+ */
+ public boolean isProcessExpired(){
+ return this.processExpired;
+ }
+
+ /**
+ * @param processExpired the processExpired to set
+ */
+ public void setProcessExpired(boolean processExpired){
+ this.processExpired=processExpired;
+ }
+
+ /**
+ * @return the processNonPersistent
+ */
+ public boolean isProcessNonPersistent(){
+ return this.processNonPersistent;
+ }
+
+ /**
+ * @param processNonPersistent the processNonPersistent to set
+ */
+ public void setProcessNonPersistent(boolean processNonPersistent){
+ this.processNonPersistent=processNonPersistent;
+ }
+
+
+}
Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/AbstractDeadLetterStrategy.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/DeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -18,6 +18,7 @@
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Message;
/**
* A strategy for choosing which destination is used for dead letter queue messages.
@@ -25,6 +26,14 @@
* @version $Revision$
*/
public interface DeadLetterStrategy {
+
+ /**
+ * Allow pluggable strategy for deciding if message should be sent to a dead letter queue
+ * for example, you might not want to ignore expired or non-persistent messages
+ * @param message
+ * @return true if message should be sent to a dead letter queue
+ */
+ public boolean isSendToDeadLetterQueue(Message message);
/**
* Returns the dead letter queue for the given destination.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/IndividualDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
*
* @version $Revision$
*/
-public class IndividualDeadLetterStrategy implements DeadLetterStrategy {
+public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
private String topicPrefix = "ActiveMQ.DLQ.Topic.";
private String queuePrefix = "ActiveMQ.DLQ.Queue.";
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SharedDeadLetterStrategy.java Fri Jul 20 10:08:10 2007
@@ -29,7 +29,7 @@
*
* @version $Revision$
*/
-public class SharedDeadLetterStrategy implements DeadLetterStrategy {
+public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ");
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java?view=diff&rev=558054&r1=558053&r2=558054
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java Fri Jul 20 10:08:10 2007
@@ -19,6 +19,7 @@
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -243,5 +244,15 @@
public BrokerService getBrokerService(){
return null;
+ }
+
+ public void messageExpired(ConnectionContext context,MessageReference messageReference){
+ }
+
+ public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
+ }
+
+ public Broker getRoot(){
+ return this;
}
}