You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/16 11:49:48 UTC

[activemq] branch activemq-5.17.x updated: AMQ-9168 - Send message expired advisory and not discard advisory when Topic subscriptions expire a message

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new f90a7bf53 AMQ-9168 - Send message expired advisory and not discard advisory when Topic subscriptions expire a message
f90a7bf53 is described below

commit f90a7bf5355e371972b7f4011779019c50d4ee48
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Tue Nov 15 13:21:00 2022 -0500

    AMQ-9168 - Send message expired advisory and not discard advisory when
    Topic subscriptions expire a message
    
    This fixes topic subs to send the right advisory type, if enabled, when
    the server discards a message on dispatch to a topic sub. Also add some
    more expiration tests for other subscription types
    
    (cherry picked from commit 757a712890996d71cf380c56fc2cefbd5a82ec88)
---
 .../org/apache/activemq/broker/region/Topic.java   |   9 +-
 .../activemq/broker/region/TopicSubscription.java  |  22 ++-
 .../apache/activemq/advisory/AdvisoryTests.java    | 168 +++++++++++++++++++--
 3 files changed, 178 insertions(+), 21 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 834cd1425..2cca4c0bd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -599,10 +599,13 @@ public class Topic extends BaseDestination implements Task {
     public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
             final MessageReference node) throws IOException {
         if (topicStore != null && node.isPersistent()) {
-            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
-            SubscriptionKey key = dsub.getSubscriptionKey();
-            topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
+            if (sub instanceof DurableTopicSubscription) {
+                DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+                SubscriptionKey key = dsub.getSubscriptionKey();
+                topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(),
+                    node.getMessageId(),
                     convertToNonRangedAck(ack, node));
+            }
         }
         messageConsumed(context, node);
     }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 26aeb5bed..5f838afcb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -187,7 +187,8 @@ public class TopicSubscription extends AbstractSubscription {
                                 messagesToEvict = oldMessages.length;
                                 for (int i = 0; i < messagesToEvict; i++) {
                                     MessageReference oldMessage = oldMessages[i];
-                                    discard(oldMessage);
+                                    //Expired here is false as we are discarding due to the messageEvictingStrategy
+                                    discard(oldMessage, false);
                                 }
                             }
                             // lets avoid an infinite loop if we are given a bad eviction strategy
@@ -233,8 +234,7 @@ public class TopicSubscription extends AbstractSubscription {
                     matched.remove();
                     node.decrementReferenceCount();
                     if (broker.isExpired(node)) {
-                        ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
-                        broker.messageExpired(getContext(), node, this);
+                        ((Destination) node.getRegionDestination()).messageExpired(getContext(), this, node);
                     }
                     break;
                 }
@@ -654,7 +654,7 @@ public class TopicSubscription extends AbstractSubscription {
                         // Message may have been sitting in the matched list a while
                         // waiting for the consumer to ak the message.
                         if (message.isExpired()) {
-                            discard(message);
+                            discard(message, true);
                             continue; // just drop it.
                         }
                         dispatch(message);
@@ -737,19 +737,25 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
-    private void discard(MessageReference message) {
+    private void discard(MessageReference message, boolean expired) {
         discarding = true;
         try {
             message.decrementReferenceCount();
             matched.remove(message);
-            discarded.incrementAndGet();
             if (destination != null) {
                 destination.getDestinationStatistics().getDequeues().increment();
             }
-            LOG.debug("{}, discarding message {}", this, message);
             Destination dest = (Destination) message.getRegionDestination();
             if (dest != null) {
-                dest.messageDiscarded(getContext(), this, message);
+                //If discard is due to expiration then use the messageExpired() callback
+                if (expired) {
+                    LOG.debug("{}, expiring message {}", this, message);
+                    dest.messageExpired(getContext(), this, message);
+                } else {
+                    LOG.debug("{}, discarding message {}", this, message);
+                    discarded.incrementAndGet();
+                    dest.messageDiscarded(getContext(), this, message);
+                }
             }
             broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
         } finally {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 201d8a092..497856505 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -40,7 +41,11 @@ import javax.jms.Topic;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.NullMessageReference;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -48,6 +53,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageDispatch;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +73,7 @@ public class AdvisoryTests {
     protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
     protected int topicCount;
     protected final boolean includeBodyForAdvisory;
-    protected final int EXPIRE_MESSAGE_PERIOD = 10000;
+    protected final int EXPIRE_MESSAGE_PERIOD = 3000;
 
 
     @Parameters(name = "includeBodyForAdvisory={0}")
@@ -230,24 +236,143 @@ public class AdvisoryTests {
     }
 
     @Test(timeout = 60000)
-    public void testMessageExpiredAdvisory() throws Exception {
-        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = s.createQueue(getClass().getName());
-        MessageConsumer consumer = s.createConsumer(queue);
-        assertNotNull(consumer);
+    public void testMessageExpiredAdvisoryQueueSubClient() throws Exception {
+        testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName() + "client.timeout"),
+            300000, true, 500);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryQueueSubServer() throws Exception {
+        testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1,true, 500);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryQueueSubServerTask() throws Exception {
+        testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1000,false,
+            EXPIRE_MESSAGE_PERIOD * 2);
+    }
+
+    private void testMessageExpiredAdvisoryQueue(ActiveMQQueue dest, int ttl, boolean createConsumer, int receiveTimeout) throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
 
-        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
+        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
         MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
         // start throwing messages at the consumer
-        MessageProducer producer = s.createProducer(queue);
-        producer.setTimeToLive(1);
+        MessageProducer producer = s.createProducer(dest);
+        producer.setTimeToLive(ttl);
+
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             BytesMessage m = s.createBytesMessage();
             m.writeBytes(new byte[1024]);
             producer.send(m);
         }
 
-        Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
+        MessageConsumer consumer = null;
+        if (createConsumer) {
+            consumer = s.createConsumer(dest);
+            assertNotNull(consumer);
+        }
+
+        Message msg = advisoryConsumer.receive(receiveTimeout);
+        assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+
+        //This should be set
+        assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
+
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryTopicSub() throws Exception {
+        ActiveMQTopic dest = new ActiveMQTopic(getClass().getName());
+        //Set prefetch to 1 so acks will trigger expiration on dispatching more messages
+        broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(1);
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
+
+        MessageConsumer consumer =  s.createConsumer(dest);
+        MessageConsumer expiredAdvisoryConsumer = s.createConsumer(AdvisorySupport.getExpiredMessageTopic(dest));
+        MessageConsumer discardedAdvisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(dest));
+
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(dest);
+        producer.setTimeToLive(10);
+        for (int i = 0; i < 10; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Thread.sleep(500);
+
+        //Receiving will trigger the server to discard on dispatch when acks are received
+        //Currently the advisory is only fired on dispatch from server or messages added to ta topic
+        //and not on expired acks from the client side as the original messages are not tracked in
+        //dispatch list so the advisory can't be fired
+        for (int i = 0; i < 10; i++) {
+            assertNull(consumer.receive(10));
+        }
+
+        //Should no longer receive discard advisories for expiration
+        assertNull(discardedAdvisoryConsumer.receive(1000));
+
+        Message msg = expiredAdvisoryConsumer.receive(1000);
+        assertNotNull(msg);
+        ActiveMQMessage message = (ActiveMQMessage) msg;
+        ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+
+        //This should be set
+        assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
+
+        //Add assertion to make sure body is included for advisory topics
+        //when includeBodyForAdvisory is true
+        assertIncludeBodyForAdvisory(payload);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryDurableClient() throws Exception {
+        testMessageExpiredDurableAdvisory(getClass().getName() + "client.timeout",
+            300000, true, 500);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryDurableServer() throws Exception {
+        testMessageExpiredDurableAdvisory(getClass().getName(), 1,true, 500);
+    }
+
+    @Test(timeout = 60000)
+    public void testMessageExpiredAdvisoryDurableServerTask() throws Exception {
+        testMessageExpiredDurableAdvisory(getClass().getName(), 2000,false, EXPIRE_MESSAGE_PERIOD * 2);
+    }
+
+    private void testMessageExpiredDurableAdvisory(String topic, int ttl, boolean bringDurableOnline,
+        int receiveTimeout) throws Exception {
+        ActiveMQTopic dest = new ActiveMQTopic(topic);
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
+
+        //create durable and send offline messages
+        MessageConsumer consumer = s.createDurableSubscriber(dest, "sub1");
+        consumer.close();
+
+        Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(dest);
+        producer.setTimeToLive(ttl);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+
+        //if flag is true then bring online to trigger expiration on dispatch
+        if (bringDurableOnline) {
+            consumer = s.createDurableSubscriber(dest, "sub1");
+        }
+
+        Message msg = advisoryConsumer.receive(receiveTimeout);
         assertNotNull(msg);
         ActiveMQMessage message = (ActiveMQMessage) msg;
         ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
@@ -420,6 +545,29 @@ public class AdvisoryTests {
         answer.addConnector("nio://localhost:0");
         answer.addConnector("tcp://localhost:0").setName("OpenWire");
         answer.setDeleteAllMessagesOnStartup(true);
+
+        // add a plugin to ensure the expiration happens on the client side rather
+        // than broker side.
+        answer.setPlugins(new BrokerPlugin[] { new BrokerPlugin() {
+
+            @Override
+            public Broker installPlugin(Broker broker) throws Exception {
+                return new BrokerFilter(broker) {
+
+                    @Override
+                    public void preProcessDispatch(MessageDispatch messageDispatch) {
+                        ActiveMQDestination dest = messageDispatch.getDestination();
+                        if (dest != null && !AdvisorySupport.isAdvisoryTopic(dest) && messageDispatch.getDestination()
+                            .getPhysicalName().contains("client.timeout")) {
+                            // Set the expiration to now
+                            messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
+                        }
+
+                        super.preProcessDispatch(messageDispatch);
+                    }
+                };
+            }
+        } });
     }
 
     protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {