You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Rob Davies <ra...@gmail.com> on 2007/08/24 08:30:53 UTC
Re: svn commit: r558054 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apa...
Hi Ferry,
its best to write to the user or dev list in general (copied). This
certainly looks like a bug. Please try with the next snapshot built
after today
cheers,
Rob
On Aug 24, 2007, at 4:17 AM, vri_97@yahoo.com wrote:
> Hi,
> First, I'm sorry if this is not a proper way to ask you a question.
> I will not do it again if that is so.
>
> I got this error,after the expired event :
> 2007-08-24 11:12:39,260 DEBUG
> [org.apache.activemq.broker.region.RegionBroker] Message expired
> ActiveMQTextMessage {commandId = 15, ...
> 2007-08-24 11:12:39,270 WARN
> [org.apache.activemq.broker.region.RegionBroker] Failed to pass
> expired message to dead letter queue
> 2007-08-24 11:12:39,270 DEBUG
> [org.apache.activemq.broker.region.AbstractRegion] Adding
> destination: topic://
> ActiveMQ.Advisory.Expired.Queue.ActiveMQ.DLQ.Queue.queue.HelloQ
>
> After that, the second expired :
> 2007-08-24 11:12:39,270 DEBUG
> [org.apache.activemq.broker.region.RegionBroker] Message expired
> ActiveMQTextMessage {commandId = 5, ....
> 2007-08-24 11:12:39,270 WARN
> [org.apache.activemq.broker.region.RegionBroker] Failed to pass
> expired message to dead letter queue
>
> I check the code, it reach to logger.warn, which didn't tell what
> is the exception. I don't know what is wrong ? is it a bug ? or is
> there anything that I should check in my config file ?
> I use ActiveMQ5-SNAPSHOT-57
>
> Thanks..
> Regards,
> Ferry
>
>
> rajdavies-2 wrote:
>>
>> Author: rajdavies
>> Date: Fri Jul 20 10:08:10 2007
>> New Revision: 558054
>>
>> URL: http://svn.apache.org/viewvc?view=rev&rev=558054
>> Log:
>> Fix for:
>> http://issues.apache.org/activemq/browse/AMQ-1207
>> http://issues.apache.org/activemq/browse/AMQ-880
>> http://issues.apache.org/activemq/browse/AMQ-450
>> http://issues.apache.org/activemq/browse/AMQ-879
>>
>> Added:
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> (with props)
>> Modified:
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisoryBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisorySupport.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/Broker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/BrokerFilter.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/EmptyBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/ErrorBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/MutableBrokerFilter.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/AbstractRegion.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/DestinationFactoryImpl.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/PrefetchSubscription.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Queue.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/RegionBroker.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TempQueueRegion.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Topic.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TopicSubscription.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/DeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/IndividualDeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/SharedDeadLetterStrategy.java
>>
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/
>> broker/StubBroker.java
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisoryBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/advisory/AdvisoryBroker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisoryBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisoryBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,6 +17,7 @@
>> */
>> package org.apache.activemq.advisory;
>>
>> +import java.io.IOException;
>> import java.util.Iterator;
>>
>> import org.apache.activemq.broker.Broker;
>> @@ -24,6 +25,7 @@
>> import org.apache.activemq.broker.ConnectionContext;
>> import org.apache.activemq.broker.ProducerBrokerExchange;
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.ActiveMQMessage;
>> @@ -38,6 +40,8 @@
>> import org.apache.activemq.command.ProducerInfo;
>> import org.apache.activemq.util.IdGenerator;
>> import org.apache.activemq.util.LongSequenceGenerator;
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>>
>> import java.util.concurrent.ConcurrentHashMap;
>>
>> @@ -49,7 +53,7 @@
>> */
>> public class AdvisoryBroker extends BrokerFilter {
>>
>> - //private static final Log log =
>> LogFactory.getLog(AdvisoryBroker.class);
>> + private static final Log log =
>> LogFactory.getLog(AdvisoryBroker.class);
>>
>> protected final ConcurrentHashMap connections = new
>> ConcurrentHashMap();
>> protected final ConcurrentHashMap consumers = new
>> ConcurrentHashMap();
>> @@ -226,6 +230,16 @@
>> ActiveMQTopic topic =
>> AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
>> producers.remove(info.getProducerId());
>> fireProducerAdvisory(context, topic,
>> info.createRemoveCommand());
>> + }
>> + }
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> messageReference){
>> + next.messageExpired(context,messageReference);
>> + try{
>> + ActiveMQTopic
>> topic=AdvisorySupport.getExpiredMessageTopic
>> (messageReference.getMessage().getDestination());
>> + fireAdvisory(context,topic,messageReference.getMessage
>> ());
>> + }catch(Exception e){
>> + log.warn("Failed to fire message expired advisory");
>> }
>> }
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisorySupport.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/advisory/AdvisorySupport.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisorySupport.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> advisory/AdvisorySupport.java
>> Fri Jul 20 10:08:10 2007
>> @@ -64,6 +64,13 @@
>> return new
>> ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX
>> +destination.getPhysicalName());
>> }
>>
>> + public static ActiveMQTopic
>> getExpiredMessageTopic(ActiveMQDestination destination) {
>> + if (destination.isQueue()) {
>> + return getExpiredQueueMessageAdvisoryTopic(destination);
>> + }
>> + return getExpiredTopicMessageAdvisoryTopic(destination);
>> + }
>> +
>> public static ActiveMQTopic
>> getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination
>> destination) {
>> String name =
>> EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
>> return new ActiveMQTopic(name);
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/Broker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/Broker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/Broker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/Broker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -20,19 +20,15 @@
>> import java.net.URI;
>> import java.util.Set;
>> import org.apache.activemq.Service;
>> -import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Region;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.BrokerId;
>> import org.apache.activemq.command.BrokerInfo;
>> import org.apache.activemq.command.ConnectionInfo;
>> import org.apache.activemq.command.DestinationInfo;
>> import org.apache.activemq.command.MessageDispatch;
>> -import org.apache.activemq.command.MessageDispatchNotification;
>> -import org.apache.activemq.command.MessagePull;
>> import org.apache.activemq.command.ProducerInfo;
>> -import org.apache.activemq.command.Response;
>> import org.apache.activemq.command.SessionInfo;
>> import org.apache.activemq.command.TransactionId;
>> import org.apache.activemq.kaha.Store;
>> @@ -135,6 +131,8 @@
>>
>> /**
>> * Gets a list of all the prepared xa transactions.
>> + * @param context transaction ids
>> + * @return
>> * @throws Exception TODO
>> */
>> public TransactionId[] getPreparedTransactions(ConnectionContext
>> context) throws Exception;
>> @@ -151,7 +149,7 @@
>> * Prepares a transaction. Only valid for xa transactions.
>> * @param context
>> * @param xid
>> - * @return
>> + * @return id
>> * @throws Exception TODO
>> */
>> public int prepareTransaction(ConnectionContext context,
>> TransactionId xid) throws Exception;
>> @@ -176,6 +174,9 @@
>>
>> /**
>> * Forgets a transaction.
>> + * @param context
>> + * @param transactionId
>> + * @throws Exception
>> */
>> public void forgetTransaction(ConnectionContext context,
>> TransactionId transactionId) throws Exception;
>>
>> @@ -246,7 +247,35 @@
>> */
>> public URI getVmConnectorURI();
>>
>> + /**
>> + * called when the brokerService starts
>> + */
>> public void brokerServiceStarted();
>>
>> + /**
>> + * @return the BrokerService
>> + */
>> BrokerService getBrokerService();
>> +
>> + /**
>> + * Ensure we get the Broker at the top of the Stack
>> + * @return the broker at the top of the Stack
>> + */
>> + Broker getRoot();
>> +
>> + /**
>> + * A Message has Expired
>> + * @param context
>> + * @param messageReference
>> + * @throws Exception
>> + */
>> + public void messageExpired(ConnectionContext context,
>> MessageReference messageReference);
>> +
>> + /**
>> + * A message needs to go the a DLQ
>> + * @param context
>> + * @param messageReference
>> + * @throws Exception
>> + */
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference);
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/BrokerFilter.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/BrokerFilter.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/BrokerFilter.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/BrokerFilter.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,12 @@
>> */
>> package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Map;
>> +import java.util.Set;
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.BrokerId;
>> import org.apache.activemq.command.BrokerInfo;
>> @@ -38,10 +41,6 @@
>> import org.apache.activemq.command.TransactionId;
>> import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>> /**
>> * Allows you to intercept broker operation so that features such as
>> security can be
>> * implemented as a pluggable filter.
>> @@ -245,5 +244,17 @@
>>
>> public BrokerService getBrokerService(){
>> return next.getBrokerService();
>> + }
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> message){
>> + next.messageExpired(context,message);
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> + next.sendToDeadLetterQueue(context,messageReference);
>> + }
>> +
>> + public Broker getRoot() {
>> + return next.getRoot();
>> }
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/EmptyBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/EmptyBroker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/EmptyBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/EmptyBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,13 @@
>> */
>> package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Collections;
>> +import java.util.Map;
>> +import java.util.Set;
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.BrokerId;
>> import org.apache.activemq.command.BrokerInfo;
>> @@ -38,11 +42,6 @@
>> import org.apache.activemq.command.TransactionId;
>> import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Collections;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>> /**
>> * Dumb implementation - used to be overriden by listeners
>> *
>> @@ -245,4 +244,14 @@
>> public BrokerService getBrokerService(){
>> return null;
>> }
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> message){
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> + }
>> +
>> + public Broker getRoot(){
>> + return null;
>> + }
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/ErrorBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/ErrorBroker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/ErrorBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/ErrorBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -21,10 +21,9 @@
>> import java.util.Collections;
>> import java.util.Map;
>> import java.util.Set;
>> -
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.BrokerId;
>> import org.apache.activemq.command.BrokerInfo;
>> @@ -245,4 +244,16 @@
>> public BrokerService getBrokerService(){
>> throw new BrokerStoppedException(this.message);
>> }
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> message){
>> + throw new BrokerStoppedException(this.message);
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference){
>> + throw new BrokerStoppedException(this.message);
>> + }
>> +
>> + public Broker getRoot(){
>> + throw new BrokerStoppedException(this.message);
>> + }
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/MutableBrokerFilter.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/MutableBrokerFilter.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/MutableBrokerFilter.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/MutableBrokerFilter.java
>> Fri Jul 20 10:08:10 2007
>> @@ -17,9 +17,12 @@
>> */
>> package org.apache.activemq.broker;
>>
>> +import java.net.URI;
>> +import java.util.Map;
>> +import java.util.Set;
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> -import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> import org.apache.activemq.command.BrokerId;
>> import org.apache.activemq.command.BrokerInfo;
>> @@ -38,10 +41,6 @@
>> import org.apache.activemq.command.TransactionId;
>> import org.apache.activemq.kaha.Store;
>>
>> -import java.net.URI;
>> -import java.util.Map;
>> -import java.util.Set;
>> -
>> /**
>> * Like a BrokerFilter but it allows you to switch the getNext
>> ().broker.
>> This has more
>> * overhead than a BrokerFilter since access to the getNext
>> ().broker has
>> to synchronized
>> @@ -258,6 +257,19 @@
>>
>> public BrokerService getBrokerService(){
>> return getNext().getBrokerService();
>> + }
>> +
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> message){
>> + getNext().messageExpired(context,message);
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference) {
>> + getNext().sendToDeadLetterQueue(context,messageReference);
>> + }
>> +
>> + public Broker getRoot(){
>> + return getNext().getRoot();
>> }
>>
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/AbstractRegion.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/AbstractRegion.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/AbstractRegion.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/AbstractRegion.java
>> Fri Jul 20 10:08:10 2007
>> @@ -332,14 +332,15 @@
>> // Try to auto create the destination... re-
>> invoke
>> broker from the
>> // top so that the proper security checks are
>> performed.
>> try {
>> +
>> +
>> context.getBroker().addDestination(context,destination);
>> dest = addDestination(context, destination);
>> -
>> //context.getBroker().addDestination(context,destination);
>> }
>> catch (DestinationAlreadyExistsException e) {
>> // if the destination already exists then
>> lets
>> ignore this error
>> }
>> // We should now have the dest created.
>> - //dest=(Destination) destinations.get
>> (destination);
>> + dest=(Destination) destinations.get
>> (destination);
>> }
>> if(dest==null){
>> throw new JMSException("The destination
>> "+destination+" does not exist.");
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/DestinationFactoryImpl.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/DestinationFactoryImpl.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/DestinationFactoryImpl.java
>> Fri Jul 20 10:08:10 2007
>> @@ -42,118 +42,121 @@
>> * @author fateev@amazon.com
>> * @version $Revision$
>> */
>> -public class DestinationFactoryImpl extends DestinationFactory {
>> +public class DestinationFactoryImpl extends DestinationFactory{
>>
>> protected final UsageManager memoryManager;
>> protected final TaskRunnerFactory taskRunnerFactory;
>> protected final PersistenceAdapter persistenceAdapter;
>> protected RegionBroker broker;
>>
>> - public DestinationFactoryImpl(UsageManager memoryManager,
>> TaskRunnerFactory taskRunnerFactory,
>> - PersistenceAdapter persistenceAdapter) {
>> - this.memoryManager = memoryManager;
>> - this.taskRunnerFactory = taskRunnerFactory;
>> - if (persistenceAdapter == null) {
>> + public DestinationFactoryImpl(UsageManager
>> memoryManager,TaskRunnerFactory taskRunnerFactory,
>> + PersistenceAdapter persistenceAdapter){
>> + this.memoryManager=memoryManager;
>> + this.taskRunnerFactory=taskRunnerFactory;
>> + if(persistenceAdapter==null){
>> throw new IllegalArgumentException("null
>> persistenceAdapter");
>> }
>> - this.persistenceAdapter = persistenceAdapter;
>> + this.persistenceAdapter=persistenceAdapter;
>> }
>>
>> - public void setRegionBroker(RegionBroker broker) {
>> - if (broker == null) {
>> + public void setRegionBroker(RegionBroker broker){
>> + if(broker==null){
>> throw new IllegalArgumentException("null broker");
>> }
>> - this.broker = broker;
>> + this.broker=broker;
>> }
>>
>> - public Set getDestinations() {
>> + public Set getDestinations(){
>> return persistenceAdapter.getDestinations();
>> }
>>
>> /**
>> * @return instance of {@link Queue} or {@link Topic}
>> */
>> - public Destination createDestination(ConnectionContext context,
>> ActiveMQDestination destination, DestinationStatistics
>> destinationStatistics) throws Exception {
>> - if (destination.isQueue()) {
>> - if (destination.isTemporary()) {
>> - final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> - return new Queue(destination, memoryManager, null,
>> destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
>> -
>> - public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>> + public Destination createDestination(ConnectionContext
>> context,ActiveMQDestination destination,
>> + DestinationStatistics destinationStatistics) throws
>> Exception{
>> + if(destination.isQueue()){
>> + if(destination.isTemporary()){
>> + final ActiveMQTempDestination
>> tempDest=(ActiveMQTempDestination)destination;
>> + return new
>> Queue(broker.getRoot(),destination,memoryManager,null,
>> +
>> destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
>> +
>> + public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception{
>> // Only consumers on the same connection can
>> consume from
>> // the temporary destination
>> - if( !tempDest.getConnectionId().equals(
>> sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
>> +
>> if(!tempDest.getConnectionId().equals(sub.getConsumerInfo
>> ().getConsumerId().getConnectionId())){
>> throw new JMSException("Cannot
>> subscribe to
>> remote temporary destination: "+tempDest);
>> }
>> - super.addSubscription(context, sub);
>> + super.addSubscription(context,sub);
>> };
>> };
>> - } else {
>> - MessageStore store =
>> persistenceAdapter.createQueueMessageStore((ActiveMQQueue)
>> destination);
>> - Queue queue = new Queue(destination, memoryManager,
>> store, destinationStatistics,
>> taskRunnerFactory,broker.getTempDataStore());
>> - configureQueue(queue, destination);
>> + }else{
>> + MessageStore
>> store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)
>> destination);
>> + Queue queue=new
>> Queue(broker.getRoot(),destination,memoryManager,store,
>> +
>> destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
>> + configureQueue(queue,destination);
>> queue.initialize();
>> return queue;
>> }
>> - } else if (destination.isTemporary()){
>> - final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> - return new Topic(destination, null, memoryManager,
>> destinationStatistics, taskRunnerFactory) {
>> - public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>> + }else if(destination.isTemporary()){
>> + final ActiveMQTempDestination
>> tempDest=(ActiveMQTempDestination)destination;
>> + return new
>> Topic(broker.getRoot(),destination,null,memoryManager,
>> + destinationStatistics,taskRunnerFactory){
>> +
>> + public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception{
>> // Only consumers on the same connection can
>> consume
>> from
>> // the temporary destination
>> - if( !tempDest.getConnectionId().equals(
>> sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) {
>> +
>> if(!tempDest.getConnectionId().equals(sub.getConsumerInfo
>> ().getConsumerId().getConnectionId())){
>> throw new JMSException("Cannot subscribe to
>> remote temporary destination: "+tempDest);
>> }
>> - super.addSubscription(context, sub);
>> + super.addSubscription(context,sub);
>> };
>> };
>> - } else {
>> - TopicMessageStore store = null;
>> - if (!AdvisorySupport.isAdvisoryTopic(destination)) {
>> - store =
>> persistenceAdapter.createTopicMessageStore((ActiveMQTopic)
>> destination);
>> + }else{
>> + TopicMessageStore store=null;
>> + if(!AdvisorySupport.isAdvisoryTopic(destination)){
>> +
>> store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)
>> destination);
>> }
>> -
>> - Topic topic = new Topic(destination, store,
>> memoryManager,
>> destinationStatistics, taskRunnerFactory);
>> - configureTopic(topic, destination);
>> -
>> + Topic topic=new
>> Topic(broker.getRoot(),destination,store,memoryManager,
>> + destinationStatistics,taskRunnerFactory);
>> + configureTopic(topic,destination);
>> return topic;
>> }
>> }
>>
>> - protected void configureQueue(Queue queue, ActiveMQDestination
>> destination) {
>> - if (broker == null) {
>> + protected void configureQueue(Queue queue,ActiveMQDestination
>> destination){
>> + if(broker==null){
>> throw new IllegalStateException("broker property is not
>> set");
>> }
>> - if (broker.getDestinationPolicy() != null) {
>> - PolicyEntry entry =
>> broker.getDestinationPolicy().getEntryFor(destination);
>> - if (entry != null) {
>> + if(broker.getDestinationPolicy()!=null){
>> + PolicyEntry
>> entry=broker.getDestinationPolicy().getEntryFor(destination);
>> + if(entry!=null){
>> entry.configure(queue,broker.getTempDataStore());
>> }
>> }
>> }
>>
>> - protected void configureTopic(Topic topic, ActiveMQDestination
>> destination) {
>> - if (broker == null) {
>> + protected void configureTopic(Topic topic,ActiveMQDestination
>> destination){
>> + if(broker==null){
>> throw new IllegalStateException("broker property is not
>> set");
>> }
>> - if (broker.getDestinationPolicy() != null) {
>> - PolicyEntry entry =
>> broker.getDestinationPolicy().getEntryFor(destination);
>> - if (entry != null) {
>> + if(broker.getDestinationPolicy()!=null){
>> + PolicyEntry
>> entry=broker.getDestinationPolicy().getEntryFor(destination);
>> + if(entry!=null){
>> entry.configure(topic);
>> }
>> }
>> }
>>
>> - public long getLastMessageBrokerSequenceId() throws
>> IOException {
>> + public long getLastMessageBrokerSequenceId() throws IOException{
>> return persistenceAdapter.getLastMessageBrokerSequenceId();
>> }
>>
>> - public PersistenceAdapter getPersistenceAdapter() {
>> + public PersistenceAdapter getPersistenceAdapter(){
>> return persistenceAdapter;
>> }
>>
>> - public SubscriptionInfo[] getAllDurableSubscriptions
>> (ActiveMQTopic
>> topic) throws IOException {
>> + public SubscriptionInfo[] getAllDurableSubscriptions
>> (ActiveMQTopic
>> topic) throws IOException{
>> return
>> persistenceAdapter.createTopicMessageStore
>> (topic).getAllSubscriptions();
>> }
>> -
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/PrefetchSubscription.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/PrefetchSubscription.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/PrefetchSubscription.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/PrefetchSubscription.java
>> Fri Jul 20 10:08:10 2007
>> @@ -276,17 +276,7 @@
>> * @throws Exception
>> */
>> protected void sendToDLQ(final ConnectionContext context,final
>> MessageReference node) throws IOException,Exception{
>> - // Send the message to the DLQ
>> - Message message=node.getMessage();
>> - if(message!=null){
>> - // The original destination and transaction id do not
>> get
>> filled when the message is first
>> - // sent,
>> - // it is only populated if the message is routed to
>> another
>> destination like the DLQ
>> - DeadLetterStrategy
>> deadLetterStrategy=node.getRegionDestination
>> ().getDeadLetterStrategy();
>> - ActiveMQDestination
>> deadLetterDestination=deadLetterStrategy
>> - .getDeadLetterQueueFor(message.getDestination
>> ());
>> - BrokerSupport.resend
>> (context,message,deadLetterDestination);
>> - }
>> + broker.sendToDeadLetterQueue(context,node);
>> }
>>
>> /**
>> @@ -393,7 +383,9 @@
>> // Message may have been sitting in the
>> pending list a while
>> // waiting for the consumer to ak the
>> message.
>>
>> if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
>> - continue; // just drop it.
>> + broker.messageExpired(getContext
>> (),node);
>> + dequeueCounter++;
>> + continue;
>> }
>> dispatch(node);
>> count++;
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Queue.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/Queue.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Queue.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Queue.java
>> Fri Jul 20 10:08:10 2007
>> @@ -27,6 +27,7 @@
>> import javax.jms.InvalidSelectorException;
>> import javax.jms.JMSException;
>>
>> +import org.apache.activemq.broker.Broker;
>> import org.apache.activemq.broker.ConnectionContext;
>> import org.apache.activemq.broker.ProducerBrokerExchange;
>> import
>> org.apache.activemq.broker.region.cursors.PendingMessageCursor;
>> @@ -72,7 +73,6 @@
>> public class Queue implements Destination, Task {
>>
>> private final Log log;
>> -
>> private final ActiveMQDestination destination;
>> private final List consumers = new CopyOnWriteArrayList();
>> private final Valve dispatchValve = new Valve(true);
>> @@ -96,9 +96,11 @@
>> private final Object doDispatchMutex = new Object();
>> private TaskRunner taskRunner;
>> private boolean started = false;
>> + final Broker broker;
>>
>> - public Queue(ActiveMQDestination destination, final UsageManager
>> memoryManager, MessageStore store, DestinationStatistics parentStats,
>> + public Queue(Broker broker,ActiveMQDestination destination,
>> final
>> UsageManager memoryManager, MessageStore store, DestinationStatistics
>> parentStats,
>> TaskRunnerFactory taskFactory, Store tmpStore) throws
>> Exception {
>> + this.broker=broker;
>> this.destination = destination;
>> this.usageManager = new
>> UsageManager(memoryManager,destination.toString());
>> this.usageManager.setUsagePortion(1.0f);
>> @@ -136,7 +138,8 @@
>> public void recoverMessage(Message message){
>> // Message could have expired while it
>> was being
>> loaded..
>> if(message.isExpired()){
>> - // TODO remove from store
>> +
>> broker.messageExpired(createConnectionContext(),message);
>> +
>> destinationStatistics.getMessages().decrement();
>> return;
>> }
>> message.setRegionDestination(Queue.this);
>> @@ -342,9 +345,8 @@
>> // There is delay between the client sending it and it
>> arriving
>> at the
>> // destination.. it may have expired.
>> if(message.isExpired()){
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " + message);
>> - }
>> + broker.messageExpired(context,message);
>> + destinationStatistics.getMessages().decrement();
>> if( ( !message.isResponseRequired() ||
>> producerExchange.getProducerState().getInfo().getWindowSize() >
>> 0 ) &&
>> !context.isInRecoveryMode() ) {
>> ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo
>> ().getProducerId(),
>> message.getSize());
>> context.getConnection().dispatchAsync(ack);
>>
>> @@ -365,9 +367,8 @@
>>
>> // While waiting for space to free up... the message
>> may
>> have expired.
>> if(message.isExpired()){
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " + message);
>> - }
>> + broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>>
>> if( !message.isResponseRequired() &&
>> !context.isInRecoveryMode() ) {
>> ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo
>> ().getProducerId(),
>> message.getSize());
>> @@ -440,10 +441,8 @@
>> // It could take while before we receive the
>> commit
>> // op, by that time the message could have
>> expired..
>> if(message.isExpired()){
>> - // TODO: remove message from store.
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " +
>> message);
>> - }
>> + broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>> return;
>> }
>> sendMessage(context,message);
>> @@ -1011,9 +1010,8 @@
>> result.add(node);
>> count++;
>> }else{
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " +
>> node);
>> - }
>> +
>> broker.messageExpired(createConnectionContext(),node);
>> +
>> destinationStatistics.getMessages().decrement();
>> }
>> }
>> }finally{
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/RegionBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/RegionBroker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/RegionBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/RegionBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -37,6 +37,7 @@
>> import org.apache.activemq.broker.DestinationAlreadyExistsException;
>> import org.apache.activemq.broker.ProducerBrokerExchange;
>> import org.apache.activemq.broker.TransactionBroker;
>> +import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
>> import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.broker.region.policy.PolicyMap;
>> import
>> org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMe
>> ssageStoragePolicy;
>> @@ -62,6 +63,7 @@
>> import org.apache.activemq.store.PersistenceAdapter;
>> import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
>> import org.apache.activemq.thread.TaskRunnerFactory;
>> +import org.apache.activemq.util.BrokerSupport;
>> import org.apache.activemq.util.IdGenerator;
>> import org.apache.activemq.util.LongSequenceGenerator;
>> import org.apache.activemq.util.ServiceStopper;
>> @@ -625,6 +627,52 @@
>> public BrokerService getBrokerService(){
>> return brokerService;
>> }
>> -
>> -
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> node){
>> + if(log.isDebugEnabled()){
>> + log.debug("Message expired "+node);
>> + }
>> + getRoot().sendToDeadLetterQueue(context,node);
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference node){
>> + try{
>> + if(node!=null){
>> + Message message=node.getMessage();
>> + if(message!=null){
>> + DeadLetterStrategy
>> deadLetterStrategy=node.getRegionDestination
>> ().getDeadLetterStrategy();
>> + if(deadLetterStrategy!=null){
>> +
>> if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
>> + long expiration=message.getExpiration();
>> + message.setExpiration(0);
>> + message.setProperty
>> ("originalExpiration",new
>> Long(expiration));
>> + if(!message.isPersistent()){
>> + message.setPersistent(true);
>> +
>> message.setProperty("originalDeliveryMode","NON_PERSISTENT");
>> + }
>> + // The original destination and
>> transaction
>> id do not get filled when the message is first
>> + // sent,
>> + // it is only populated if the
>> message is
>> routed to another destination like the DLQ
>> + ActiveMQDestination
>> deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor
>> (message
>> + .getDestination());
>> +
>> BrokerSupport.resend(context,message,deadLetterDestination);
>> + }
>> + }
>> + }else{
>> + log.warn("Null message for node: "+node);
>> + }
>> + }
>> + }catch(Exception e){
>> + log.warn("Failed to pass expired message to dead letter
>> queue");
>> + }
>> + }
>> +
>> + public Broker getRoot(){
>> + try{
>> + return getBrokerService().getBroker();
>> + }catch(Exception e){
>> + log.fatal("Trying to get Root Broker "+e);
>> + throw new RuntimeException("The broker from the
>> BrokerService
>> should not throw an exception");
>> + }
>> + }
>> }
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TempQueueRegion.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/TempQueueRegion.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TempQueueRegion.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TempQueueRegion.java
>> Fri Jul 20 10:08:10 2007
>> @@ -41,7 +41,7 @@
>>
>> protected Destination createDestination(ConnectionContext
>> context,
>> ActiveMQDestination destination) throws Exception {
>> final ActiveMQTempDestination tempDest =
>> (ActiveMQTempDestination) destination;
>> - return new Queue(destination, memoryManager, null,
>> destinationStatistics, taskRunnerFactory, null) {
>> + return new Queue(broker.getRoot(),destination,
>> memoryManager,
>> null, destinationStatistics, taskRunnerFactory, null) {
>>
>> public void addSubscription(ConnectionContext
>> context,Subscription sub) throws Exception {
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Topic.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/Topic.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Topic.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/Topic.java
>> Fri Jul 20 10:08:10 2007
>> @@ -24,6 +24,7 @@
>> import java.util.concurrent.CopyOnWriteArrayList;
>> import java.util.concurrent.CopyOnWriteArraySet;
>> import org.apache.activemq.advisory.AdvisorySupport;
>> +import org.apache.activemq.broker.Broker;
>> import org.apache.activemq.broker.ConnectionContext;
>> import org.apache.activemq.broker.ProducerBrokerExchange;
>> import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
>> @@ -72,10 +73,11 @@
>> private boolean sendAdvisoryIfNoConsumers;
>> private DeadLetterStrategy deadLetterStrategy = new
>> SharedDeadLetterStrategy();
>> private final ConcurrentHashMap durableSubcribers = new
>> ConcurrentHashMap();
>> + final Broker broker;
>>
>> - public Topic(ActiveMQDestination destination, TopicMessageStore
>> store, UsageManager memoryManager, DestinationStatistics parentStats,
>> + public Topic(Broker broker,ActiveMQDestination destination,
>> TopicMessageStore store, UsageManager memoryManager,
>> DestinationStatistics
>> parentStats,
>> TaskRunnerFactory taskFactory) {
>> -
>> + this.broker=broker;
>> this.destination = destination;
>> this.store = store; //this could be NULL! (If an advsiory)
>> this.usageManager = new
>> UsageManager(memoryManager,destination.toString());
>> @@ -261,9 +263,8 @@
>> // There is delay between the client sending it and it
>> arriving at
>> the
>> // destination.. it may have expired.
>> if( message.isExpired() ) {
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " + message);
>> - }
>> + broker.messageExpired(context,message);
>> + destinationStatistics.getMessages().decrement();
>> if( ( !message.isResponseRequired() ||
>> producerExchange.getProducerState().getInfo().getWindowSize() >
>> 0 ) &&
>> !context.isInRecoveryMode() ) {
>> ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo
>> ().getProducerId(),
>> message.getSize());
>> context.getConnection().dispatchAsync(ack);
>>
>> @@ -285,9 +286,8 @@
>>
>> // While waiting for space to free up... the message
>> may
>> have expired.
>> if(message.isExpired()){
>> - if (log.isDebugEnabled()) {
>> - log.debug("Expired message: " + message);
>> - }
>> + broker.messageExpired(context,message);
>> +
>> destinationStatistics.getMessages().decrement();
>>
>> if( !message.isResponseRequired() &&
>> !context.isInRecoveryMode() ) {
>> ProducerAck ack = new
>> ProducerAck(producerExchange.getProducerState().getInfo
>> ().getProducerId(),
>> message.getSize());
>> @@ -357,7 +357,9 @@
>> // It could take while before we receive the
>> commit
>> // operration.. by that time the message
>> could have
>> expired..
>> if( message.isExpired() ) {
>> - // TODO: remove message from store.
>> + broker.messageExpired(context,message);
>> + message.decrementReferenceCount();
>> +
>> destinationStatistics.getMessages().decrement();
>> return;
>> }
>> dispatch(context, message);
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TopicSubscription.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/TopicSubscription.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TopicSubscription.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/TopicSubscription.java
>> Fri Jul 20 10:08:10 2007
>> @@ -103,12 +103,7 @@
>> int messagesToEvict=oldMessages.length;
>> for(int i=0;i<messagesToEvict;i++){
>> MessageReference
>> oldMessage=oldMessages[i];
>> - oldMessage.decrementReferenceCount
>> ();
>> - matched.remove(oldMessage);
>> - discarded++;
>> - if(log.isDebugEnabled()){
>> - log.debug("Discarding message
>> "+oldMessages[i]);
>> - }
>> + discard(oldMessage);
>> }
>> // lets avoid an infinite loop if we are
>> given a bad eviction strategy
>> // for a bad strategy lets just not
>> evict
>> @@ -138,6 +133,7 @@
>> matched.remove();
>> dispatchedCounter.incrementAndGet();
>> node.decrementReferenceCount();
>> + broker.messageExpired(getContext(),node);
>> break;
>> }
>> }
>> @@ -367,6 +363,8 @@
>> // waiting for the consumer to ak the message.
>> if(message.isExpired()){
>> message.decrementReferenceCount();
>> + broker.messageExpired(getContext(),message);
>> + dequeueCounter.incrementAndGet();
>> continue; // just drop it.
>> }
>> dispatch(message);
>> @@ -409,6 +407,17 @@
>>
>> node.getRegionDestination().getDestinationStatistics
>> ().getDispatched().increment();
>> node.decrementReferenceCount();
>> }
>> + }
>> +
>> + private void discard(MessageReference message) {
>> + message.decrementReferenceCount();
>> + matched.remove(message);
>> + discarded++;
>> + dequeueCounter.incrementAndGet();
>> + if(log.isDebugEnabled()){
>> + log.debug("Discarding message "+message);
>> + }
>> + broker.getRoot().sendToDeadLetterQueue(getContext
>> (),message);
>> }
>>
>> public String toString(){
>>
>> Added:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/policy/
>> AbstractDeadLetterStrategy.java?view=auto&rev=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> (added)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -0,0 +1,74 @@
>> +/**
>> + *
>> + * Licensed to the Apache Software Foundation (ASF) under one or
>> more
>> + * contributor license agreements. See the NOTICE file
>> distributed with
>> + * this work for additional information regarding copyright
>> ownership.
>> + * The ASF licenses this file to You under the Apache License,
>> Version
>> 2.0
>> + * (the "License"); you may not use this file except in
>> compliance with
>> + * the License. You may obtain a copy of the License at
>> + *
>> + * http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>> + * See the License for the specific language governing
>> permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.activemq.broker.region.policy;
>> +
>> +import org.apache.activemq.command.Message;
>> +
>> +/**
>> + * A strategy for choosing which destination is used for dead letter
>> queue messages.
>> + *
>> + * @version $Revision: 426366 $
>> + */
>> +public abstract class AbstractDeadLetterStrategy implements
>> DeadLetterStrategy {
>> + private boolean processNonPersistent=true;
>> + private boolean processExpired=true;
>> +
>> + public boolean isSendToDeadLetterQueue(Message message){
>> + boolean result=false;
>> + if(message!=null){
>> + result=true;
>> +
>> if(message.isPersistent()==false&&processNonPersistent==false){
>> + result=false;
>> + }
>> + if(message.isExpired()&&processExpired==false){
>> + result=false;
>> + }
>> + }
>> + return result;
>> + }
>> +
>> + /**
>> + * @return the processExpired
>> + */
>> + public boolean isProcessExpired(){
>> + return this.processExpired;
>> + }
>> +
>> + /**
>> + * @param processExpired the processExpired to set
>> + */
>> + public void setProcessExpired(boolean processExpired){
>> + this.processExpired=processExpired;
>> + }
>> +
>> + /**
>> + * @return the processNonPersistent
>> + */
>> + public boolean isProcessNonPersistent(){
>> + return this.processNonPersistent;
>> + }
>> +
>> + /**
>> + * @param processNonPersistent the processNonPersistent to set
>> + */
>> + public void setProcessNonPersistent(boolean
>> processNonPersistent){
>> + this.processNonPersistent=processNonPersistent;
>> + }
>> +
>> +
>> +}
>>
>> Propchange:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/AbstractDeadLetterStrategy.java
>> ---------------------------------------------------------------------
>> ---------
>> svn:eol-style = native
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/DeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/policy/
>> DeadLetterStrategy.java?view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/DeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/DeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -18,6 +18,7 @@
>> package org.apache.activemq.broker.region.policy;
>>
>> import org.apache.activemq.command.ActiveMQDestination;
>> +import org.apache.activemq.command.Message;
>>
>> /**
>> * A strategy for choosing which destination is used for dead letter
>> queue messages.
>> @@ -25,6 +26,14 @@
>> * @version $Revision$
>> */
>> public interface DeadLetterStrategy {
>> +
>> + /**
>> + * Allow pluggable strategy for deciding if message should be
>> sent to
>> a dead letter queue
>> + * for example, you might not want to ignore expired or
>> non-persistent messages
>> + * @param message
>> + * @return true if message should be sent to a dead letter queue
>> + */
>> + public boolean isSendToDeadLetterQueue(Message message);
>>
>> /**
>> * Returns the dead letter queue for the given destination.
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/policy/
>> IndividualDeadLetterStrategy.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/IndividualDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -29,7 +29,7 @@
>> *
>> * @version $Revision$
>> */
>> -public class IndividualDeadLetterStrategy implements
>> DeadLetterStrategy {
>> +public class IndividualDeadLetterStrategy extends
>> AbstractDeadLetterStrategy {
>>
>> private String topicPrefix = "ActiveMQ.DLQ.Topic.";
>> private String queuePrefix = "ActiveMQ.DLQ.Queue.";
>>
>> Modified:
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/SharedDeadLetterStrategy.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/
>> java/org/apache/activemq/broker/region/policy/
>> SharedDeadLetterStrategy.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/SharedDeadLetterStrategy.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/main/java/org/apache/activemq/
>> broker/region/policy/SharedDeadLetterStrategy.java
>> Fri Jul 20 10:08:10 2007
>> @@ -29,7 +29,7 @@
>> *
>> * @version $Revision$
>> */
>> -public class SharedDeadLetterStrategy implements
>> DeadLetterStrategy {
>> +public class SharedDeadLetterStrategy extends
>> AbstractDeadLetterStrategy
>> {
>>
>> private ActiveMQDestination deadLetterQueue = new
>> ActiveMQQueue("ActiveMQ.DLQ");
>>
>>
>> Modified:
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/
>> broker/StubBroker.java
>> URL:
>> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/
>> java/org/apache/activemq/broker/StubBroker.java?
>> view=diff&rev=558054&r1=558053&r2=558054
>> =====================================================================
>> =========
>> ---
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/
>> broker/StubBroker.java
>> (original)
>> +++
>> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/
>> broker/StubBroker.java
>> Fri Jul 20 10:08:10 2007
>> @@ -19,6 +19,7 @@
>> package org.apache.activemq.broker;
>>
>> import org.apache.activemq.broker.region.Destination;
>> +import org.apache.activemq.broker.region.MessageReference;
>> import org.apache.activemq.broker.region.Subscription;
>> import
>> org.apache.activemq.broker.region.policy.PendingDurableSubscriberMess
>> ageStoragePolicy;
>> import org.apache.activemq.command.ActiveMQDestination;
>> @@ -243,5 +244,15 @@
>>
>> public BrokerService getBrokerService(){
>> return null;
>> + }
>> +
>> + public void messageExpired(ConnectionContext
>> context,MessageReference
>> messageReference){
>> + }
>> +
>> + public void sendToDeadLetterQueue(ConnectionContext
>> context,MessageReference messageReference) {
>> + }
>> +
>> + public Broker getRoot(){
>> + return this;
>> }
>> }
>>
>>
>>
>>
> Quoted from:
> http://www.nabble.com/svn-commit%3A-r558054---in--activemq-trunk-
> activemq-core-src%3A-main-java-org-apache-activemq-advisory--main-
> java-org-apache-activemq-broker--main-java-org-apache-activemq-
> broker-region--main-java-org-apache-activemq-broker-region-policy--
> test-java-org-apa...-tf4118408s2354.html#a11712223
>
Re: svn commit: r558054 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/ test/java/org/ap
Posted by ferry97 <vr...@yahoo.com>.
Hi Rob,
I run it using release 62, but got this error:
2007-08-28 17:57:40,151 WARN
[org.apache.activemq.broker.region.RegionBroker] Failed to pass expired
message to dead letter queue
java.lang.NullPointerException
at org.apache.activemq.util.BrokerSupport.resend(BrokerSupport.java:55)
at
org.apache.activemq.broker.region.RegionBroker.sendToDeadLetterQueue(RegionBroker.java:667)
at
org.apache.activemq.broker.BrokerFilter.sendToDeadLetterQueue(BrokerFilter.java:257)
at
org.apache.activemq.broker.BrokerFilter.sendToDeadLetterQueue(BrokerFilter.java:257)
at
org.apache.activemq.broker.BrokerFilter.sendToDeadLetterQueue(BrokerFilter.java:257)
at
org.apache.activemq.broker.MutableBrokerFilter.sendToDeadLetterQueue(MutableBrokerFilter.java:273)
at
org.apache.activemq.broker.region.RegionBroker.messageExpired(RegionBroker.java:636)
at
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:253)
at
org.apache.activemq.advisory.AdvisoryBroker.messageExpired(AdvisoryBroker.java:237)
at
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:253)
at
org.apache.activemq.broker.MutableBrokerFilter.messageExpired(MutableBrokerFilter.java:269)
at org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1027)
at org.apache.activemq.broker.region.Queue.browse(Queue.java:660)
at
org.apache.activemq.broker.jmx.DestinationView.browseMessages(DestinationView.java:171)
at
org.apache.activemq.broker.jmx.DestinationView.browseMessages(DestinationView.java:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:585)
at
org.jboss.mx.interceptor.ReflectedDispatcher.invoke(ReflectedDispatcher.java:155)
at org.jboss.mx.server.Invocation.dispatch(Invocation.java:94)
at org.jboss.mx.server.Invocation.invoke(Invocation.java:86)
at
org.jboss.mx.server.AbstractMBeanInvoker.invoke(AbstractMBeanInvoker.java:264)
at org.jboss.mx.server.MBeanServerImpl.invoke(MBeanServerImpl.java:659)
at org.jboss.jmx.adaptor.control.Server.invokeOpByName(Server.java:258)
at org.jboss.jmx.adaptor.control.Server.invokeOp(Server.java:223)
at
org.jboss.jmx.adaptor.html.HtmlAdaptorServlet.invokeOp(HtmlAdaptorServlet.java:262)
at
org.jboss.jmx.adaptor.html.HtmlAdaptorServlet.processRequest(HtmlAdaptorServlet.java:100)
at
org.jboss.jmx.adaptor.html.HtmlAdaptorServlet.doPost(HtmlAdaptorServlet.java:82)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:710)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at
org.jboss.web.tomcat.filters.ReplyHeaderFilter.doFilter(ReplyHeaderFilter.java:96)
at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:230)
at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175)
at
org.jboss.web.tomcat.security.SecurityAssociationValve.invoke(SecurityAssociationValve.java:179)
at
org.jboss.web.tomcat.security.JaccContextValve.invoke(JaccContextValve.java:84)
at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128)
at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:104)
at
org.jboss.web.tomcat.service.jca.CachedConnectionValve.invoke(CachedConnectionValve.java:156)
at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:241)
at
org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844)
at
org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:580)
at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447)
at java.lang.Thread.run(Thread.java:595)
cheers,
Ferry
rajdavies wrote:
>
> Hi Ferry,
>
> its best to write to the user or dev list in general (copied). This
> certainly looks like a bug. Please try with the next snapshot built
> after today
>
> cheers,
>
> Rob
>
> On Aug 24, 2007, at 4:17 AM, vri_97@yahoo.com wrote:
>
>> Hi,
>> First, I'm sorry if this is not a proper way to ask you a question.
>> I will not do it again if that is so.
>>
>> I got this error,after the expired event :
>> 2007-08-24 11:12:39,260 DEBUG
>> [org.apache.activemq.broker.region.RegionBroker] Message expired
>> ActiveMQTextMessage {commandId = 15, ...
>> 2007-08-24 11:12:39,270 WARN
>> [org.apache.activemq.broker.region.RegionBroker] Failed to pass
>> expired message to dead letter queue
>> 2007-08-24 11:12:39,270 DEBUG
>> [org.apache.activemq.broker.region.AbstractRegion] Adding
>> destination: topic://
>> ActiveMQ.Advisory.Expired.Queue.ActiveMQ.DLQ.Queue.queue.HelloQ
>>
>> After that, the second expired :
>> 2007-08-24 11:12:39,270 DEBUG
>> [org.apache.activemq.broker.region.RegionBroker] Message expired
>> ActiveMQTextMessage {commandId = 5, ....
>> 2007-08-24 11:12:39,270 WARN
>> [org.apache.activemq.broker.region.RegionBroker] Failed to pass
>> expired message to dead letter queue
>>
>> I check the code, it reach to logger.warn, which didn't tell what
>> is the exception. I don't know what is wrong ? is it a bug ? or is
>> there anything that I should check in my config file ?
>> I use ActiveMQ5-SNAPSHOT-57
>>
>> Thanks..
>> Regards,
>> Ferry
>>
>>
>
--
View this message in context: http://www.nabble.com/Re%3A-svn-commit%3A-r558054---in--activemq-trunk-activemq-core-src%3A-main-java-org-apache-activemq-advisory--main-java-org-apache-activemq-broker--main-java-org-apache-activemq-broker-region--main-java-org-apache-activemq-broker-region-policy--test-java-org-ap-tf4321916s2354.html#a12365243
Sent from the ActiveMQ - User mailing list archive at Nabble.com.