You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by tpavelka <to...@ca.com> on 2017/11/01 11:58:37 UTC

NPE when using a file based cursor with durable topic subscriptions

I am using a file based cursor with durable topic subscriptions because in my
tests the broker would run out of memory when dealing with large numbers of
messages without an active consumer.
I have run into a NullPointerException when the messages meant for the topic
with an active durable subscription expire. Here is part of the stack trace:

java.lang.NullPointerException: null
	at org.apache.activemq.broker.region.Topic.acknowledge(Topic.java:586)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:810)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.discardExpiredMessage(FilePendingMessageCursor.java:489)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.tryAddMessageLast(FilePendingMessageCursor.java:247)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.addMessageLast(AbstractPendingMessageCursor.java:93)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:157)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:279)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.region.Topic$2.recoverMessage(Topic.java:314)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.execute(KahaDBStore.java:1012)
[activemq-kahadb-store-5.15.2.jar:5.15.2]
	at
org.apache.activemq.store.kahadb.disk.page.Transaction.execute(Transaction.java:779)
[activemq-kahadb-store-5.15.2.jar:5.15.2]
	at
org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.recoverSubscription(KahaDBStore.java:999)
[activemq-kahadb-store-5.15.2.jar:5.15.2]
	at
org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(ProxyTopicMessageStore.java:108)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.region.Topic.activate(Topic.java:307)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.DurableTopicSubscription.add(DurableTopicSubscription.java:123)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:164)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.TopicRegion.addSubscriptionsForDestination(TopicRegion.java:287)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:162)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:339)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:239)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:174)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:104)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:200)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:189)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:119)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:747)
[activemq-broker-5.15.2.jar:5.15.2]
	at
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:733)
[activemq-broker-5.15.2.jar:5.15.2]
	at org.apache.activemq.broker.BrokerService.start(BrokerService.java:636)
[activemq-broker-5.15.2.jar:5.15.2]

I looked at the code and it seems to me that this is caused by the method
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#discardExpiredMessage:

   private void discardExpiredMessage(MessageReference reference) {
        LOG.debug("Discarding expired message {}", reference);
        if (reference.isExpired() && broker.isExpired(reference)) {
            ConnectionContext context = new ConnectionContext(new
NonCachedMessageEvaluationContext());
            context.setBroker(broker);
           
((Destination)reference.getRegionDestination()).messageExpired(context,
null, new IndirectMessageReference(reference.getMessage()));
        }
    }

There the subscription passed to Destination#messageExpired is set to null.
If the destination is a topic, then later in
org.apache.activemq.broker.region.Topic#acknowledge:

    @Override
    public void acknowledge(ConnectionContext context, Subscription sub,
final MessageAck ack,
            final MessageReference node) throws IOException {
        if (topicStore != null && node.isPersistent()) {
            DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
            SubscriptionKey key = dsub.getSubscriptionKey();
            topicStore.acknowledge(context, key.getClientId(),
key.getSubscriptionName(), node.getMessageId(),
                    convertToNonRangedAck(ack, node));
        }
        messageConsumed(context, node);
    }

The code dsub.getSubscriptionKey() throws an NPE.

Is this a bug or am I using the wrong type of cursor?

Thanks,
Tomas



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: NPE when using a file based cursor with durable topic subscriptions

Posted by tpavelka <to...@ca.com>.
We need this one badly because it breaks all possibilities for message
expiration on durable topics. I would provide a fix but I am lacking
sufficient understanding of the code flow around cursors and the message
store ;-)



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: NPE when using a file based cursor with durable topic subscriptions

Posted by Tim Bain <tb...@alumni.duke.edu>.
That's great, thank you.

Although we welcome all bug reports, we especially appreciate when the
submitter provides a detailed analysis and/or a test to reproduce the
problem, so thank you for that.

Tim

On Nov 15, 2017 8:54 AM, "tpavelka" <to...@ca.com> wrote:

> Thanks for the confirmation, I have opened
> https://issues.apache.org/jira/browse/AMQ-6863 and added a Java class that
> reproduces the issue.
>
> Tomas
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>

Re: NPE when using a file based cursor with durable topic subscriptions

Posted by tpavelka <to...@ca.com>.
Thanks for the confirmation, I have opened
https://issues.apache.org/jira/browse/AMQ-6863 and added a Java class that
reproduces the issue.

Tomas



--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Re: NPE when using a file based cursor with durable topic subscriptions

Posted by Tim Bain <tb...@alumni.duke.edu>.
Tomas,

This does appear to be a bug, and I concur with your assessment of how the
null reference is getting passed in. Can you please submit a bug in JIRA?

Thanks,
Tim

On Wed, Nov 1, 2017 at 5:58 AM, tpavelka <to...@ca.com> wrote:

> I am using a file based cursor with durable topic subscriptions because in
> my
> tests the broker would run out of memory when dealing with large numbers of
> messages without an active consumer.
> I have run into a NullPointerException when the messages meant for the
> topic
> with an active durable subscription expire. Here is part of the stack
> trace:
>
> java.lang.NullPointerException: null
>         at org.apache.activemq.broker.region.Topic.acknowledge(
> Topic.java:586)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.messageExpired(
> Topic.java:810)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.
> discardExpiredMessage(FilePendingMessageCursor.java:489)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.
> tryAddMessageLast(FilePendingMessageCursor.java:247)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor.
> addMessageLast(AbstractPendingMessageCursor.java:93)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.
> add(PrefetchSubscription.java:157)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(
> DurableTopicSubscription.java:279)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic$2.recoverMessage(
> Topic.java:314)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore$6.
> execute(KahaDBStore.java:1012)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.disk.page.Transaction.
> execute(Transaction.java:779)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.kahadb.KahaDBStore$KahaDBTopicMessageStore.
> recoverSubscription(KahaDBStore.java:999)
> [activemq-kahadb-store-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.store.ProxyTopicMessageStore.recoverSubscription(
> ProxyTopicMessageStore.java:108)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.activate(Topic.
> java:307)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.DurableTopicSubscription.add(
> DurableTopicSubscription.java:123)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.region.Topic.addSubscription(
> Topic.java:164)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.TopicRegion.
> addSubscriptionsForDestination(TopicRegion.java:287)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.
> addDestination(AbstractRegion.java:162)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.
> addDestination(RegionBroker.java:339)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(
> BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(
> AdvisoryBroker.java:239)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(
> BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(
> BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerFilter.addDestination(
> BrokerFilter.java:174)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.AbstractRegion.start(
> AbstractRegion.java:104)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.region.RegionBroker.start(
> RegionBroker.java:200)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerFilter.start(
> BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerFilter.start(
> BrokerFilter.java:189)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.TransactionBroker.start(
> TransactionBroker.java:119)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService$6.start(BrokerService.java:2370)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.doStartBroker(
> BrokerService.java:747)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at
> org.apache.activemq.broker.BrokerService.startBroker(
> BrokerService.java:733)
> [activemq-broker-5.15.2.jar:5.15.2]
>         at org.apache.activemq.broker.BrokerService.start(
> BrokerService.java:636)
> [activemq-broker-5.15.2.jar:5.15.2]
>
> I looked at the code and it seems to me that this is caused by the method
> org.apache.activemq.broker.region.cursors.FilePendingMessageCursor#
> discardExpiredMessage:
>
>    private void discardExpiredMessage(MessageReference reference) {
>         LOG.debug("Discarding expired message {}", reference);
>         if (reference.isExpired() && broker.isExpired(reference)) {
>             ConnectionContext context = new ConnectionContext(new
> NonCachedMessageEvaluationContext());
>             context.setBroker(broker);
>
> ((Destination)reference.getRegionDestination()).messageExpired(context,
> null, new IndirectMessageReference(reference.getMessage()));
>         }
>     }
>
> There the subscription passed to Destination#messageExpired is set to null.
> If the destination is a topic, then later in
> org.apache.activemq.broker.region.Topic#acknowledge:
>
>     @Override
>     public void acknowledge(ConnectionContext context, Subscription sub,
> final MessageAck ack,
>             final MessageReference node) throws IOException {
>         if (topicStore != null && node.isPersistent()) {
>             DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
>             SubscriptionKey key = dsub.getSubscriptionKey();
>             topicStore.acknowledge(context, key.getClientId(),
> key.getSubscriptionName(), node.getMessageId(),
>                     convertToNonRangedAck(ack, node));
>         }
>         messageConsumed(context, node);
>     }
>
> The code dsub.getSubscriptionKey() throws an NPE.
>
> Is this a bug or am I using the wrong type of cursor?
>
> Thanks,
> Tomas
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>