You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Endre Stølsvik <En...@stolsvik.com> on 2022/10/06 09:41:52 UTC

DLQ for Topics?

Hi!
Is DLQ supported for plain Topics? I can't seem to get that to work.

Also, I wonder what would happen if a topic was subscribed to by 100
consumers, and then 50 of them rolled back ("Nack'ed") the delivery?

Thanks,
Kind regards,
Endre.

Re: DLQ for Topics?

Posted by Clebert Suconic <cl...@gmail.com>.
It’s supposed to work in both artemis and amq5.

Would need more detail to figure out what you’re doing wrong.

On Thu, Oct 6, 2022 at 5:42 AM Endre Stølsvik <En...@stolsvik.com> wrote:

> Hi!
> Is DLQ supported for plain Topics? I can't seem to get that to work.
>
> Also, I wonder what would happen if a topic was subscribed to by 100
> consumers, and then 50 of them rolled back ("Nack'ed") the delivery?
>
> Thanks,
> Kind regards,
> Endre.
>
-- 
Clebert Suconic

Re: DLQ for Topics?

Posted by Endre Stølsvik <En...@stolsvik.com>.
Thanks for answering!

The question regarded *Classic*, sorry for not mentioning.

Clebert: Test added at end.

1. AFAIU, useQueueForTopicMessages=true by default.

2. The question with rollback/"nacking": If 100 consumers do subscribe to
the topic. One message is delivered, all get it. Then 50 of them eventually
nack the delivery - will there come 50 DLQ entries for this message?

Retrying: As far as I thought I knew, the retrying of ActiveMQ happens on
the client. That is, the broker only delivers the message once to the
client, and then it is the client that does the redeliveries. Once the
client has exhausted the redelivery policy, it sends a "nack" to the
server, which promptly executes the dead letter action. There is a "plugin"
to enable broker-side redelivery, but I found this way less than ideal. If
you keep the client side redelivery policy in place, you'll get 6
(standard) client side attempts, multiplied by whatever attempts you have
on the broker. It felt very much like this was a tack-on solution, and that
client side solution is how it really was designed.

An aside: It took some time to understand this client-side logic, but
finally understood that it is like this to keep message ordering. There is
a method to disable this, but that doesn't work with exponential backoff
combined with a string of poison message. I've created a bug, with a PR,
for this: https://issues.apache.org/jira/browse/AMQ-8617

AFAIU this, the following code should receive the DLQ and exit? It does
not. Where is my mistake?

public class Test_TopicDLQ {
    private static final Logger log =
LoggerFactory.getLogger(Test_TopicDLQ.class);

    private static final String DLQ_PREFIX = "DLQ.";

    @Test
    public void topicDlq() throws Exception {
        BrokerService broker = createInVmActiveMqBroker();
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("vm://" + broker.getBrokerName()
                + "?create=false");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        String topicName = "Test";

        CountDownLatch latch = new CountDownLatch(1);

        Thread thread = new Thread(() -> {
            try {
                Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
                Topic topic = session.createTopic(topicName);
                MessageConsumer consumer = session.createConsumer(topic);
                latch.countDown();
                while (true) {
                    log.info("Going into consume");
                    Message receive = consumer.receive();
                    log.info("Got message!\n ###" + receive);
                    log.info("Rolling back!");
                    session.rollback();
                }

            }
            catch (Throwable t) {
                log.error("Consumer threw!", t);
            }

        }, "Consumer");
        thread.start();

        latch.await();
        log.info("Receiver reached consumer.");

        // :: Send message to topic
        log.info("Sending message to topic [" + topicName + "]");
        Session sessionProduce = connection.createSession(true,
Session.SESSION_TRANSACTED);
        Topic topic = sessionProduce.createTopic(topicName);
        MessageProducer producer = sessionProduce.createProducer(topic);
        TextMessage textMessage = sessionProduce.createTextMessage("Text!");
        producer.send(textMessage);
        sessionProduce.commit();
        sessionProduce.close();

        // :: Wait for DLQ
        String dlqName = DLQ_PREFIX + topicName;
        log.info("Receiving DLQ from [" + dlqName + "]");
        Session sessionDlqReceive = connection.createSession(true,
Session.SESSION_TRANSACTED);
        Queue queue = sessionDlqReceive.createQueue(dlqName);
        MessageConsumer consumer = sessionDlqReceive.createConsumer(queue);
        Message receive = consumer.receive();
        log.info("Got DLQ message!\n" + receive);

        connection.stop();
        broker.stop();
    }

    protected static BrokerService createInVmActiveMqBroker() {
        BrokerService broker = new BrokerService();
        broker.setBrokerName("testbroker");
        // :: Disable a bit of stuff for testing:
        // No need for persistence; No need for persistence across
reboots, and don't want KahaDB dirs and files.
        broker.setPersistent(false);

        // :: Set Individual DLQ - which you most definitely should do
in production.
        // Hear, hear:
https://users.activemq.apache.narkive.com/H7400Mn1/policymap-api-is-really-bad
        IndividualDeadLetterStrategy individualDeadLetterStrategy =
new IndividualDeadLetterStrategy();
        individualDeadLetterStrategy.setQueuePrefix(DLQ_PREFIX);
        individualDeadLetterStrategy.setTopicPrefix(DLQ_PREFIX);
        individualDeadLetterStrategy.setProcessExpired(true);
        individualDeadLetterStrategy.setProcessNonPersistent(true);
        individualDeadLetterStrategy.setUseQueueForTopicMessages(true);
// true is also default
        individualDeadLetterStrategy.setUseQueueForQueueMessages(true);
// true is also default.

        // :: Create policy entry for TOPICS:
        PolicyEntry allTopicsPolicy = new PolicyEntry();
        allTopicsPolicy.setDestination(new ActiveMQTopic(">")); // all topics
        allTopicsPolicy.setDeadLetterStrategy(individualDeadLetterStrategy);

        // .. create the PolicyMap containing the two destination policies
        PolicyMap policyMap = new PolicyMap();
        policyMap.put(allTopicsPolicy.getDestination(), allTopicsPolicy);
        broker.setDestinationPolicy(policyMap);

        // :: Start the broker.
        try {
            broker.start();
        }
        catch (Exception e) {
            throw new AssertionError("Could not start ActiveMQ
BrokerService '" + broker.getBrokerName() + "'.", e);
        }
        return broker;
    }
}



On Thu, Oct 6, 2022 at 4:51 PM Matt Pavlovich <ma...@gmail.com> wrote:

> Hi Endre-
>
> 1. Yes, topics can support being DLQ’d to a queue.
>
> Set useQueueForTopicMessages=“true” in the deadLetterStrategy for the
> destinationPolicy you are using.
>
> ref: https://activemq.apache.org/message-redelivery-and-dlq-handling <
> https://activemq.apache.org/message-redelivery-and-dlq-handling>
>
> 2. When a consumer is subscribed to a topic there is an implied
> ’subscription’ for each consumer. This subscription tracks the flow of
> messages for each consumer. For a non-durable (default) topic subscription,
> when a consumer rolls back messages the broker will attempt a redelivery
> _back_ to the same consumer for _n_ number of attempts based on the
> redelivery strategy. Once the max redelivery attempts is exhausted, the
> _broker_ moves the messages to the DLQ and processes the next available.
>
> Since every consumer on a topic has their own subscription messages are
> not redelivered to other consumers.
>
> ref: same link as above ;-)
>
> Pro-tip — consuming from topics can get.. weird.  Especially, if you plan
> on trying to cluster brokers and are managing expiration, DLQ, eviction,
> slow consumers, etc. If this is a new message flow, consider using Virtual
> Topics which allow pub-sub but the consumer apps read from queues instead
> of topics. Queues are much easier for developer teams to rationalize all
> the flows (error handling, etc) and Virtual Topics are easier for admins to
> be explicit with the flow of data across multiple brokers— clustering
> consumers vs replicate to another zone, etc.
>
> ref:  https://activemq.apache.org/virtual-destinations
>
> Thanks,
> Matt Pavlovich
>
> > On Oct 6, 2022, at 4:41 AM, Endre Stølsvik <En...@stolsvik.com> wrote:
> >
> > Hi!
> > Is DLQ supported for plain Topics? I can't seem to get that to work.
> >
> > Also, I wonder what would happen if a topic was subscribed to by 100
> > consumers, and then 50 of them rolled back ("Nack'ed") the delivery?
> >
> > Thanks,
> > Kind regards,
> > Endre.
>
>

Re: DLQ for Topics?

Posted by Matt Pavlovich <ma...@gmail.com>.
Hi Endre-

1. Yes, topics can support being DLQ’d to a queue. 

Set useQueueForTopicMessages=“true” in the deadLetterStrategy for the destinationPolicy you are using.

ref: https://activemq.apache.org/message-redelivery-and-dlq-handling <https://activemq.apache.org/message-redelivery-and-dlq-handling>

2. When a consumer is subscribed to a topic there is an implied ’subscription’ for each consumer. This subscription tracks the flow of messages for each consumer. For a non-durable (default) topic subscription, when a consumer rolls back messages the broker will attempt a redelivery _back_ to the same consumer for _n_ number of attempts based on the redelivery strategy. Once the max redelivery attempts is exhausted, the _broker_ moves the messages to the DLQ and processes the next available. 

Since every consumer on a topic has their own subscription messages are not redelivered to other consumers. 

ref: same link as above ;-)

Pro-tip — consuming from topics can get.. weird.  Especially, if you plan on trying to cluster brokers and are managing expiration, DLQ, eviction, slow consumers, etc. If this is a new message flow, consider using Virtual Topics which allow pub-sub but the consumer apps read from queues instead of topics. Queues are much easier for developer teams to rationalize all the flows (error handling, etc) and Virtual Topics are easier for admins to be explicit with the flow of data across multiple brokers— clustering consumers vs replicate to another zone, etc.

ref:  https://activemq.apache.org/virtual-destinations

Thanks,
Matt Pavlovich

> On Oct 6, 2022, at 4:41 AM, Endre Stølsvik <En...@stolsvik.com> wrote:
> 
> Hi!
> Is DLQ supported for plain Topics? I can't seem to get that to work.
> 
> Also, I wonder what would happen if a topic was subscribed to by 100
> consumers, and then 50 of them rolled back ("Nack'ed") the delivery?
> 
> Thanks,
> Kind regards,
> Endre.