You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/10/07 18:41:18 UTC
svn commit: r822799 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/broker/region/BaseDestination.java
test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Author: dejanb
Date: Wed Oct 7 16:41:17 2009
New Revision: 822799
URL: http://svn.apache.org/viewvc?rev=822799&view=rev
Log:
merging 822797 - sendAdvisoryIfNoConsumers
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=822799&r1=822798&r2=822799&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Wed Oct 7 16:41:17 2009
@@ -25,6 +25,7 @@
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageDispatchNotification;
@@ -471,13 +472,14 @@
* Provides a hook to allow messages with no consumer to be processed in
* some way - such as to send to a dead letter queue or something..
*/
- protected void onMessageWithNoConsumers(ConnectionContext context, Message message) throws Exception {
- if (!message.isPersistent()) {
+ protected void onMessageWithNoConsumers(ConnectionContext context, Message msg) throws Exception {
+ if (!msg.isPersistent()) {
if (isSendAdvisoryIfNoConsumers()) {
// allow messages with no consumers to be dispatched to a dead
// letter queue
if (destination.isQueue() || !AdvisorySupport.isAdvisoryTopic(destination)) {
+ Message message = msg.copy();
// The original destination and transaction id do not get
// filled when the message is first sent,
// it is only populated if the message is routed to another
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java?rev=822799&r1=822798&r2=822799&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java Wed Oct 7 16:41:17 2009
@@ -16,14 +16,23 @@
*/
package org.apache.activemq.broker.policy;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
/**
* @version $Revision$
@@ -46,6 +55,38 @@
assertNotNull("Should be a message for loop: " + i, msg);
}
}
+
+ public void testConsumerReceivesMessages() throws Exception {
+ this.topic = false;
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(getDestination());
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ Topic advisoryTopic = AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination());
+ MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic);
+
+ TextMessage msg = session.createTextMessage("Message: x");
+ producer.send(msg);
+
+ Message advisoryMessage = advisoryConsumer.receive(1000);
+ assertNotNull("Advisory message not received", advisoryMessage);
+
+ Thread.sleep(1000);
+
+ factory = (ActiveMQConnectionFactory)createConnectionFactory();
+ connection = (ActiveMQConnection)factory.createConnection();
+ connection.start();
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = session.createConsumer(getDestination());
+ Message received = consumer.receive(1000);
+ assertNotNull("Message not received", received);
+ }
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();