You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/16 11:49:48 UTC
[activemq] branch activemq-5.17.x updated: AMQ-9168 - Send message expired advisory and not discard advisory when Topic subscriptions expire a message
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
new f90a7bf53 AMQ-9168 - Send message expired advisory and not discard advisory when Topic subscriptions expire a message
f90a7bf53 is described below
commit f90a7bf5355e371972b7f4011779019c50d4ee48
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Tue Nov 15 13:21:00 2022 -0500
AMQ-9168 - Send message expired advisory and not discard advisory when
Topic subscriptions expire a message
This fixes topic subs to send the right advisory type, if enabled, when
the server discards a message on dispatch to a topic sub. Also add some
more expiration tests for other subscription types
(cherry picked from commit 757a712890996d71cf380c56fc2cefbd5a82ec88)
---
.../org/apache/activemq/broker/region/Topic.java | 9 +-
.../activemq/broker/region/TopicSubscription.java | 22 ++-
.../apache/activemq/advisory/AdvisoryTests.java | 168 +++++++++++++++++++--
3 files changed, 178 insertions(+), 21 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 834cd1425..2cca4c0bd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -599,10 +599,13 @@ public class Topic extends BaseDestination implements Task {
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
- DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
- SubscriptionKey key = dsub.getSubscriptionKey();
- topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId(),
+ if (sub instanceof DurableTopicSubscription) {
+ DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+ SubscriptionKey key = dsub.getSubscriptionKey();
+ topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(),
+ node.getMessageId(),
convertToNonRangedAck(ack, node));
+ }
}
messageConsumed(context, node);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 26aeb5bed..5f838afcb 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -187,7 +187,8 @@ public class TopicSubscription extends AbstractSubscription {
messagesToEvict = oldMessages.length;
for (int i = 0; i < messagesToEvict; i++) {
MessageReference oldMessage = oldMessages[i];
- discard(oldMessage);
+ //Expired here is false as we are discarding due to the messageEvictingStrategy
+ discard(oldMessage, false);
}
}
// lets avoid an infinite loop if we are given a bad eviction strategy
@@ -233,8 +234,7 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove();
node.decrementReferenceCount();
if (broker.isExpired(node)) {
- ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
- broker.messageExpired(getContext(), node, this);
+ ((Destination) node.getRegionDestination()).messageExpired(getContext(), this, node);
}
break;
}
@@ -654,7 +654,7 @@ public class TopicSubscription extends AbstractSubscription {
// Message may have been sitting in the matched list a while
// waiting for the consumer to ak the message.
if (message.isExpired()) {
- discard(message);
+ discard(message, true);
continue; // just drop it.
}
dispatch(message);
@@ -737,19 +737,25 @@ public class TopicSubscription extends AbstractSubscription {
}
}
- private void discard(MessageReference message) {
+ private void discard(MessageReference message, boolean expired) {
discarding = true;
try {
message.decrementReferenceCount();
matched.remove(message);
- discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}
- LOG.debug("{}, discarding message {}", this, message);
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
- dest.messageDiscarded(getContext(), this, message);
+ //If discard is due to expiration then use the messageExpired() callback
+ if (expired) {
+ LOG.debug("{}, expiring message {}", this, message);
+ dest.messageExpired(getContext(), this, message);
+ } else {
+ LOG.debug("{}, discarding message {}", this, message);
+ discarded.incrementAndGet();
+ dest.messageDiscarded(getContext(), this, message);
+ }
}
broker.getRoot().sendToDeadLetterQueue(getContext(), message, this, new Throwable("TopicSubDiscard. ID:" + info.getConsumerId()));
} finally {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 201d8a092..497856505 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -40,7 +41,11 @@ import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.NullMessageReference;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -48,6 +53,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageDispatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -67,7 +73,7 @@ public class AdvisoryTests {
protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
protected int topicCount;
protected final boolean includeBodyForAdvisory;
- protected final int EXPIRE_MESSAGE_PERIOD = 10000;
+ protected final int EXPIRE_MESSAGE_PERIOD = 3000;
@Parameters(name = "includeBodyForAdvisory={0}")
@@ -230,24 +236,143 @@ public class AdvisoryTests {
}
@Test(timeout = 60000)
- public void testMessageExpiredAdvisory() throws Exception {
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = s.createQueue(getClass().getName());
- MessageConsumer consumer = s.createConsumer(queue);
- assertNotNull(consumer);
+ public void testMessageExpiredAdvisoryQueueSubClient() throws Exception {
+ testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName() + "client.timeout"),
+ 300000, true, 500);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryQueueSubServer() throws Exception {
+ testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1,true, 500);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryQueueSubServerTask() throws Exception {
+ testMessageExpiredAdvisoryQueue(new ActiveMQQueue(getClass().getName()), 1000,false,
+ EXPIRE_MESSAGE_PERIOD * 2);
+ }
+
+ private void testMessageExpiredAdvisoryQueue(ActiveMQQueue dest, int ttl, boolean createConsumer, int receiveTimeout) throws Exception {
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
- Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic((ActiveMQDestination) queue);
+ Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
// start throwing messages at the consumer
- MessageProducer producer = s.createProducer(queue);
- producer.setTimeToLive(1);
+ MessageProducer producer = s.createProducer(dest);
+ producer.setTimeToLive(ttl);
+
for (int i = 0; i < MESSAGE_COUNT; i++) {
BytesMessage m = s.createBytesMessage();
m.writeBytes(new byte[1024]);
producer.send(m);
}
- Message msg = advisoryConsumer.receive(EXPIRE_MESSAGE_PERIOD);
+ MessageConsumer consumer = null;
+ if (createConsumer) {
+ consumer = s.createConsumer(dest);
+ assertNotNull(consumer);
+ }
+
+ Message msg = advisoryConsumer.receive(receiveTimeout);
+ assertNotNull(msg);
+ ActiveMQMessage message = (ActiveMQMessage) msg;
+ ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+
+ //This should be set
+ assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
+
+ //Add assertion to make sure body is included for advisory topics
+ //when includeBodyForAdvisory is true
+ assertIncludeBodyForAdvisory(payload);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryTopicSub() throws Exception {
+ ActiveMQTopic dest = new ActiveMQTopic(getClass().getName());
+ //Set prefetch to 1 so acks will trigger expiration on dispatching more messages
+ broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(1);
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
+
+ MessageConsumer consumer = s.createConsumer(dest);
+ MessageConsumer expiredAdvisoryConsumer = s.createConsumer(AdvisorySupport.getExpiredMessageTopic(dest));
+ MessageConsumer discardedAdvisoryConsumer = s.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(dest));
+
+ // start throwing messages at the consumer
+ MessageProducer producer = s.createProducer(dest);
+ producer.setTimeToLive(10);
+ for (int i = 0; i < 10; i++) {
+ BytesMessage m = s.createBytesMessage();
+ m.writeBytes(new byte[1024]);
+ producer.send(m);
+ }
+ Thread.sleep(500);
+
+ //Receiving will trigger the server to discard on dispatch when acks are received
+ //Currently the advisory is only fired on dispatch from server or messages added to ta topic
+ //and not on expired acks from the client side as the original messages are not tracked in
+ //dispatch list so the advisory can't be fired
+ for (int i = 0; i < 10; i++) {
+ assertNull(consumer.receive(10));
+ }
+
+ //Should no longer receive discard advisories for expiration
+ assertNull(discardedAdvisoryConsumer.receive(1000));
+
+ Message msg = expiredAdvisoryConsumer.receive(1000);
+ assertNotNull(msg);
+ ActiveMQMessage message = (ActiveMQMessage) msg;
+ ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
+
+ //This should be set
+ assertNotNull(message.getProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL));
+
+ //Add assertion to make sure body is included for advisory topics
+ //when includeBodyForAdvisory is true
+ assertIncludeBodyForAdvisory(payload);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryDurableClient() throws Exception {
+ testMessageExpiredDurableAdvisory(getClass().getName() + "client.timeout",
+ 300000, true, 500);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryDurableServer() throws Exception {
+ testMessageExpiredDurableAdvisory(getClass().getName(), 1,true, 500);
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageExpiredAdvisoryDurableServerTask() throws Exception {
+ testMessageExpiredDurableAdvisory(getClass().getName(), 2000,false, EXPIRE_MESSAGE_PERIOD * 2);
+ }
+
+ private void testMessageExpiredDurableAdvisory(String topic, int ttl, boolean bringDurableOnline,
+ int receiveTimeout) throws Exception {
+ ActiveMQTopic dest = new ActiveMQTopic(topic);
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);;
+
+ //create durable and send offline messages
+ MessageConsumer consumer = s.createDurableSubscriber(dest, "sub1");
+ consumer.close();
+
+ Topic advisoryTopic = AdvisorySupport.getExpiredMessageTopic(dest);
+ MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+ // start throwing messages at the consumer
+ MessageProducer producer = s.createProducer(dest);
+ producer.setTimeToLive(ttl);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ BytesMessage m = s.createBytesMessage();
+ m.writeBytes(new byte[1024]);
+ producer.send(m);
+ }
+
+ //if flag is true then bring online to trigger expiration on dispatch
+ if (bringDurableOnline) {
+ consumer = s.createDurableSubscriber(dest, "sub1");
+ }
+
+ Message msg = advisoryConsumer.receive(receiveTimeout);
assertNotNull(msg);
ActiveMQMessage message = (ActiveMQMessage) msg;
ActiveMQMessage payload = (ActiveMQMessage) message.getDataStructure();
@@ -420,6 +545,29 @@ public class AdvisoryTests {
answer.addConnector("nio://localhost:0");
answer.addConnector("tcp://localhost:0").setName("OpenWire");
answer.setDeleteAllMessagesOnStartup(true);
+
+ // add a plugin to ensure the expiration happens on the client side rather
+ // than broker side.
+ answer.setPlugins(new BrokerPlugin[] { new BrokerPlugin() {
+
+ @Override
+ public Broker installPlugin(Broker broker) throws Exception {
+ return new BrokerFilter(broker) {
+
+ @Override
+ public void preProcessDispatch(MessageDispatch messageDispatch) {
+ ActiveMQDestination dest = messageDispatch.getDestination();
+ if (dest != null && !AdvisorySupport.isAdvisoryTopic(dest) && messageDispatch.getDestination()
+ .getPhysicalName().contains("client.timeout")) {
+ // Set the expiration to now
+ messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
+ }
+
+ super.preProcessDispatch(messageDispatch);
+ }
+ };
+ }
+ } });
}
protected void assertIncludeBodyForAdvisory(ActiveMQMessage payload) {