You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by swift99 <dj...@maxistechnology.com> on 2016/03/09 21:11:27 UTC

ActiveMQ / JMS “losing” messages - what am I missing?

What am I missing?

This is cross-posted from
http://stackoverflow.com/questions/35896002/activemq-jms-losing-messages-what-am-i-missing.

AMQ version 5.13.2 
Java 1.8.0_74 
Windows 10
transportConnector uri="vm://localhost"

Given a simple test case, two Object messages are transmitted, one with data
and the other being an end-of-data marker. Only the end-of-data marker is
being received.

The queue is created at the beginning of the job, and destroyed after the
job completes.

If I run a larger number of transactions, I see about a 50% receive rate.

The logs clearly show that the receiver was started before the first message
was put on the queue, both messages are being put on the queue, but only the
second message is actually being received.

Both the sender and receiver are on the same JVM. Each has its own Session
and Connection.

Transmit code:

    private void doSomeStuffInTransaction (final Object model) {
        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus
status) {
                try {
                    doSomeStuff ( model );

                    ObjectMessage message = session.createObjectMessage(
                            (model.getRoot() == null)
                            ? null
                            : model.getRoot().getContents().getId());
                    messageProducer.send(message);
                    logger.debug("Sent: {}", message.toString());
                }catch (Exception e) {
                      //use this to rollback exception in case of exception
                    status.setRollbackOnly();
                    throw new RuntimeException(e.getmessage(), e);
                }   

            }});
    }   
Receiver code:

public Object read() throws Exception,
        UnexpectedInputException, ParseException,
        NonTransientResourceException {

    Object result = null;

    logger.debug("Attempting to receive message on connection: ",
connection.toString());

    ObjectMessage msg = (ObjectMessage) messageConsumer.receive();
    logger.debug("Received: {}", msg.toString());
    result = msg.getObject();

    return result;
}


Log snip:

DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on
connection: 
... snip ...
DEBUG com.mylib.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage
{commandId = 0, responseRequired = false, messageId =
ID:zip-51438-1457536143607-4:1:5:1:1, originalDestination = null,
originalTransactionId = null, producerId = null, destination =
queue://Workorders via SQL.383, transactionId = null, expiration = 0,
timestamp = 1457536412608, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content =
org.apache.activemq.util.ByteSequence@3c21d3e0, marshalledProperties = null,
dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
readOnlyProperties = false, readOnlyBody = false, droppable = false,
jmsXGroupFirstForConsumer = false}
INFO  com.mylib.SelectedDataJmsWriter - Committed 1 hierarchies to redo log
and JMS queue
INFO  com.mylib.SourceSelectionReaderImpl - Returning empty treemodel and
end-of-stream placeholder.
DEBUG com.mylib.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage
{commandId = 0, responseRequired = false, messageId =
ID:zip-51438-1457536143607-4:1:5:1:2, originalDestination = null,
originalTransactionId = null, producerId = null, destination =
queue://Workorders via SQL.383, transactionId = null, expiration = 0,
timestamp = 1457536412696, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
null, dataStructure = null, redeliveryCounter = 0, size = 0, properties =
null, readOnlyProperties = false, readOnlyBody = false, droppable = false,
jmsXGroupFirstForConsumer = false}
INFO  com.mylib.SelectedDataJmsWriter - Committed 1 hierarchies to redo log
and JMS queue
DEBUG com.mylib.SelectedDataJmsReader - Received: ActiveMQObjectMessage
{commandId = 19, responseRequired = true, messageId =
ID:zip-51438-1457536143607-4:1:5:1:2, originalDestination = null,
originalTransactionId = null, producerId =
ID:zip-51438-1457536143607-4:1:5:1, destination = queue://Workorders via
SQL.383, transactionId = null, expiration = 0, timestamp = 1457536412696,
arrival = 0, brokerInTime = 1457536412696, brokerOutTime = 1457536412697,
correlationId = null, replyTo = null, persistent = true, type = null,
priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
compressed = false, userID = null, content = null, marshalledProperties =
null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties =
null, readOnlyProperties = true, readOnlyBody = true, droppable = false,
jmsXGroupFirstForConsumer = false}
INFO  com.mylib.SelectedDataJmsReader - executed read, found end-of-stream
marker, returning null



--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by swift99 <dj...@maxistechnology.com>.
With one consumer, all messages are consumed.  As near as I can tell, amq and
jms worked correctly, .I got bitten again by Spring.

Thanks for your help!



--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127p4709227.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by Tim Bain <tb...@alumni.duke.edu>.
With only one consumer, do all messages now get consumed?

Is there anything we can explain to help with using JMX in the future?
On Mar 11, 2016 8:46 AM, "swift99" <dj...@maxistechnology.com> wrote:

> Thank you Tim,
>
> In the Receiver Setup code, note that the beforeStep() method is annotated
> with @BeforeStep. I think this means that the receiver is being set up
> twice, and probably with a prefetch optimization.  This is validated
> because
> the log shows TWO subscriptions.  Not being a heavy JMS user, I was under
> the mistaken impression that one was for the receiver but the other was for
> the sender.
>
>     @Override
>     @BeforeStep
>     public void beforeStep(StepExecution stepExecution) {
>         this.stepExecution = stepExecution;
>         this.setJobExecution(stepExecution.getJobExecution());
>
> After removing the @BeforeStep annotation, the log shows only one
> subscription
>
>     DEBUG org.apache.activemq.broker.region.Queue - queue://Workorders via
> SQL.408, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0,
> Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0,
> enqueueCount: 1370, dequeueCount: 1369, memUsage:1024
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127p4709200.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by swift99 <dj...@maxistechnology.com>.
Thank you Tim,

In the Receiver Setup code, note that the beforeStep() method is annotated
with @BeforeStep. I think this means that the receiver is being set up
twice, and probably with a prefetch optimization.  This is validated because
the log shows TWO subscriptions.  Not being a heavy JMS user, I was under
the mistaken impression that one was for the receiver but the other was for
the sender.

    @Override
    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
        this.setJobExecution(stepExecution.getJobExecution());

After removing the @BeforeStep annotation, the log shows only one
subscription

    DEBUG org.apache.activemq.broker.region.Queue - queue://Workorders via
SQL.408, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0,
Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0,
enqueueCount: 1370, dequeueCount: 1369, memUsage:1024








--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127p4709200.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by Tim Bain <tb...@alumni.duke.edu>.
You got some, not all, of what JMX would have told you.  But it's enough.

DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1,
pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
dequeueCount: 0, memUsage:3155

Why is subscriptions=2?  What's your second consumer, and why is it there
when it shouldn't be?

With a JMX viewer, you'd have seen the second subscription, seen that each
consumer gets one of the messages, and not had to ask your follow-up
question.  So, what does "not having much luck with the JMX viewer" mean,
and how can we help you learn whatever you need to so you can use one
successfully in the future?

Tim
On Mar 10, 2016 12:58 PM, "swift99" <dj...@maxistechnology.com> wrote:

> I'm not having much luck with the JMX viewer, but I turned on debug logging
> to get (I think) the same information.
>
> It is very clear that there was two enqueues and one dequeue, with the
> dequeue of the "end-of-data marker" happening before the dequeue of the
> intended data.
>
> Log entries follow:
>
> INFO  com.myapp.SourceSelectionReaderImpl - 1 rows were selected as roots
> in
> 602 milliseconds.
> DEBUG org.apache.activemq.broker.region.Queue - localhost Message
> ID:zip-54580-1457638742697-4:2:2:1:1 sent to queue://Stuff via SQL.401
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
> subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0,
> pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1,
> dequeueCount: 0, memUsage:2214
> INFO  com.myapp.SelectedDataJmsWriter - Committed 1 thing to redo log and
> JMS queue
>
> *** Emphasis added - Note that the end of stream marker is posted second
> ***
> INFO  com.myapp.SourceSelectionReaderImpl - Returning empty thing and
> end-of-stream placeholder.
> DEBUG org.apache.activemq.broker.region.Queue - localhost Message
> ID:zip-54580-1457638742697-4:2:2:1:2 sent to queue://Stuff via SQL.401
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
> subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1,
> pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
> dequeueCount: 0, memUsage:3155
> INFO  com.myapp.SelectedDataJmsWriter - Committed 1 thing to redo log and
> JMS queue
>
> *** Emphasis added - note that the end of stream marker is retrieved first,
> the first one is still marked as inflight ***
> INFO  com.myapp.SelectedDataJmsReader - executed read, found end-of-stream
> marker, returning null
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
> subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
> pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
> dequeueCount: 1, memUsage:1107
>
>
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
> subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
> pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
> dequeueCount: 1, memUsage:1107
> DEBUG org.apache.activemq.broker.region.AbstractRegion - localhost removing
> consumer: ID:zip-54580-1457638742697-4:1:2:1 for destination: queue://Stuff
> via SQL.401
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401
> remove sub: QueueSubscription: consumer=ID:zip-54580-1457638742697-4:1:2:1,
> destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId:
> 9,
> dequeues: 1, dispatched: 2, inflight: 1, groups: 0
> DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
> subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
> pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
> dequeueCount: 1, memUsage:1107
> INFO  org.springframework.batch.core.launch.support.SimpleJobLauncher -
> Job:
> [FlowJob: [name=archive-purge]] completed with the following parameters:
> [{jobName=Stuff via SQL}] and the following status: [COMPLETED]
>
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127p4709178.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by swift99 <dj...@maxistechnology.com>.
I'm not having much luck with the JMX viewer, but I turned on debug logging
to get (I think) the same information.

It is very clear that there was two enqueues and one dequeue, with the
dequeue of the "end-of-data marker" happening before the dequeue of the
intended data.

Log entries follow:

INFO  com.myapp.SourceSelectionReaderImpl - 1 rows were selected as roots in
602 milliseconds.
DEBUG org.apache.activemq.broker.region.Queue - localhost Message
ID:zip-54580-1457638742697-4:2:2:1:1 sent to queue://Stuff via SQL.401
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0,
pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1,
dequeueCount: 0, memUsage:2214
INFO  com.myapp.SelectedDataJmsWriter - Committed 1 thing to redo log and
JMS queue

*** Emphasis added - Note that the end of stream marker is posted second ***
INFO  com.myapp.SourceSelectionReaderImpl - Returning empty thing and
end-of-stream placeholder.
DEBUG org.apache.activemq.broker.region.Queue - localhost Message
ID:zip-54580-1457638742697-4:2:2:1:2 sent to queue://Stuff via SQL.401
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1,
pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
dequeueCount: 0, memUsage:3155
INFO  com.myapp.SelectedDataJmsWriter - Committed 1 thing to redo log and
JMS queue

*** Emphasis added - note that the end of stream marker is retrieved first,
the first one is still marked as inflight ***
INFO  com.myapp.SelectedDataJmsReader - executed read, found end-of-stream
marker, returning null
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
dequeueCount: 1, memUsage:1107


DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
dequeueCount: 1, memUsage:1107
DEBUG org.apache.activemq.broker.region.AbstractRegion - localhost removing
consumer: ID:zip-54580-1457638742697-4:1:2:1 for destination: queue://Stuff
via SQL.401
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401
remove sub: QueueSubscription: consumer=ID:zip-54580-1457638742697-4:1:2:1,
destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId: 9,
dequeues: 1, dispatched: 2, inflight: 1, groups: 0
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.401,
subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1,
pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2,
dequeueCount: 1, memUsage:1107
INFO  org.springframework.batch.core.launch.support.SimpleJobLauncher - Job:
[FlowJob: [name=archive-purge]] completed with the following parameters:
[{jobName=Stuff via SQL}] and the following status: [COMPLETED]




--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127p4709178.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: ActiveMQ / JMS “losing” messages - what am I missing?

Posted by Tim Bain <tb...@alumni.duke.edu>.
Use a JMX viewer to look at the stats for the queue; how many enqueues and
dequeues do you see?  How many consumers are there?

On Wed, Mar 9, 2016 at 1:11 PM, swift99 <dj...@maxistechnology.com>
wrote:

> What am I missing?
>
> This is cross-posted from
>
> http://stackoverflow.com/questions/35896002/activemq-jms-losing-messages-what-am-i-missing
> .
>
> AMQ version 5.13.2
> Java 1.8.0_74
> Windows 10
> transportConnector uri="vm://localhost"
>
> Given a simple test case, two Object messages are transmitted, one with
> data
> and the other being an end-of-data marker. Only the end-of-data marker is
> being received.
>
> The queue is created at the beginning of the job, and destroyed after the
> job completes.
>
> If I run a larger number of transactions, I see about a 50% receive rate.
>
> The logs clearly show that the receiver was started before the first
> message
> was put on the queue, both messages are being put on the queue, but only
> the
> second message is actually being received.
>
> Both the sender and receiver are on the same JVM. Each has its own Session
> and Connection.
>
> Transmit code:
>
>     private void doSomeStuffInTransaction (final Object model) {
>         transactionTemplate.execute(new TransactionCallbackWithoutResult()
> {
>             @Override
>             protected void doInTransactionWithoutResult(TransactionStatus
> status) {
>                 try {
>                     doSomeStuff ( model );
>
>                     ObjectMessage message = session.createObjectMessage(
>                             (model.getRoot() == null)
>                             ? null
>                             : model.getRoot().getContents().getId());
>                     messageProducer.send(message);
>                     logger.debug("Sent: {}", message.toString());
>                 }catch (Exception e) {
>                       //use this to rollback exception in case of exception
>                     status.setRollbackOnly();
>                     throw new RuntimeException(e.getmessage(), e);
>                 }
>
>             }});
>     }
> Receiver code:
>
> public Object read() throws Exception,
>         UnexpectedInputException, ParseException,
>         NonTransientResourceException {
>
>     Object result = null;
>
>     logger.debug("Attempting to receive message on connection: ",
> connection.toString());
>
>     ObjectMessage msg = (ObjectMessage) messageConsumer.receive();
>     logger.debug("Received: {}", msg.toString());
>     result = msg.getObject();
>
>     return result;
> }
>
>
> Log snip:
>
> DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on
> connection:
> ... snip ...
> DEBUG com.mylib.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage
> {commandId = 0, responseRequired = false, messageId =
> ID:zip-51438-1457536143607-4:1:5:1:1, originalDestination = null,
> originalTransactionId = null, producerId = null, destination =
> queue://Workorders via SQL.383, transactionId = null, expiration = 0,
> timestamp = 1457536412608, arrival = 0, brokerInTime = 0, brokerOutTime =
> 0,
> correlationId = null, replyTo = null, persistent = true, type = null,
> priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content =
> org.apache.activemq.util.ByteSequence@3c21d3e0, marshalledProperties =
> null,
> dataStructure = null, redeliveryCounter = 0, size = 0, properties = null,
> readOnlyProperties = false, readOnlyBody = false, droppable = false,
> jmsXGroupFirstForConsumer = false}
> INFO  com.mylib.SelectedDataJmsWriter - Committed 1 hierarchies to redo log
> and JMS queue
> INFO  com.mylib.SourceSelectionReaderImpl - Returning empty treemodel and
> end-of-stream placeholder.
> DEBUG com.mylib.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage
> {commandId = 0, responseRequired = false, messageId =
> ID:zip-51438-1457536143607-4:1:5:1:2, originalDestination = null,
> originalTransactionId = null, producerId = null, destination =
> queue://Workorders via SQL.383, transactionId = null, expiration = 0,
> timestamp = 1457536412696, arrival = 0, brokerInTime = 0, brokerOutTime =
> 0,
> correlationId = null, replyTo = null, persistent = true, type = null,
> priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content = null, marshalledProperties =
> null, dataStructure = null, redeliveryCounter = 0, size = 0, properties =
> null, readOnlyProperties = false, readOnlyBody = false, droppable = false,
> jmsXGroupFirstForConsumer = false}
> INFO  com.mylib.SelectedDataJmsWriter - Committed 1 hierarchies to redo log
> and JMS queue
> DEBUG com.mylib.SelectedDataJmsReader - Received: ActiveMQObjectMessage
> {commandId = 19, responseRequired = true, messageId =
> ID:zip-51438-1457536143607-4:1:5:1:2, originalDestination = null,
> originalTransactionId = null, producerId =
> ID:zip-51438-1457536143607-4:1:5:1, destination = queue://Workorders via
> SQL.383, transactionId = null, expiration = 0, timestamp = 1457536412696,
> arrival = 0, brokerInTime = 1457536412696, brokerOutTime = 1457536412697,
> correlationId = null, replyTo = null, persistent = true, type = null,
> priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null,
> compressed = false, userID = null, content = null, marshalledProperties =
> null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties
> =
> null, readOnlyProperties = true, readOnlyBody = true, droppable = false,
> jmsXGroupFirstForConsumer = false}
> INFO  com.mylib.SelectedDataJmsReader - executed read, found end-of-stream
> marker, returning null
>
>
>
> --
> View this message in context:
> http://activemq.2283324.n4.nabble.com/ActiveMQ-JMS-losing-messages-what-am-I-missing-tp4709127.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>