You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Matt Pavlovich <ma...@gmail.com> on 2014/09/30 22:24:54 UTC

BrokerService.acknowledge() behavior

I have a class that is loaded as a plugin that implements BrokerService.acknowledge(), but I’m getting “Standard” acknowledgements for almost all consumption scenarios, even rollback. If I rollback a message, the MessageAck type in the MessageAck() is still “Standard”, and the message isn’t being consumed (as expected). The Javadocs indicate that “Standard” is to tell the broker to remove the message. 

This doesn’t seem to be consistent.. am I missing something?

Is this the correct place to plugin to the broker to detect when a message is acknowledged vs rolled back, etc?


BrokerService.acknowledge() impl:
  @Override
  public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
    ….
         if(ack.isDeliveredAck())
		  log "Delivered";
	  else if(ack.isExpiredAck())
		  log "Expired";
	  else if(ack.isIndividualAck())
		  log "Individual";
	  else if(ack.isPoisonAck())
		  log "Poison";
	  else if(ack.isRedeliveredAck())
		  log "Redelivered";
	  else if(ack.isUnmatchedAck())
		  log "Unmatched";
	  else if(ack.isStandardAck())
		  log "Standard";
	  else 
		  log "Unknown AckType: " + ack.getAckType();
    …

Client:
		Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
		MessageConsumer consumer = session.createConsumer(session.createQueue(“QUEUE.IN"));
		TextMessage message = (TextMessage)consumer.receive(2000);
		
		while(message != null) {
			logger.info("--- [testTransactedReceiveRollback] Receiving test message : " + message.getText() + " JMS Message ID: " + message.getJMSMessageID());
			message = (TextMessage)consumer.receive(2000);
		} 
		session.rollback();
		closeResources(connection, session, null, consumer);
          ….

Thanks,
Matt Pavlovich | Founding Partner | Media Driver
512.284.4330 | matt.pavlovich@mediadriver.com
810 Hesters Crossing, Suite 165, Round Rock, TX 78681


Re: BrokerService.acknowledge() behavior

Posted by Gary Tully <ga...@gmail.com>.
the ack comes back but it is pending transaction completion. The ack is
discarded if the transaction rolls back. The operations associated with the
ack are associated with the broker side transaction via synchronizations in
the transaction broker and message stores.

On 30 September 2014 21:24, Matt Pavlovich <ma...@gmail.com> wrote:

> I have a class that is loaded as a plugin that implements
> BrokerService.acknowledge(), but I’m getting “Standard” acknowledgements
> for almost all consumption scenarios, even rollback. If I rollback a
> message, the MessageAck type in the MessageAck() is still “Standard”, and
> the message isn’t being consumed (as expected). The Javadocs indicate that
> “Standard” is to tell the broker to remove the message.
>
> This doesn’t seem to be consistent.. am I missing something?
>
> Is this the correct place to plugin to the broker to detect when a message
> is acknowledged vs rolled back, etc?
>
>
> BrokerService.acknowledge() impl:
>   @Override
>   public void acknowledge(ConsumerBrokerExchange consumerExchange,
> MessageAck ack) throws Exception {
>     ….
>          if(ack.isDeliveredAck())
>                   log "Delivered";
>           else if(ack.isExpiredAck())
>                   log "Expired";
>           else if(ack.isIndividualAck())
>                   log "Individual";
>           else if(ack.isPoisonAck())
>                   log "Poison";
>           else if(ack.isRedeliveredAck())
>                   log "Redelivered";
>           else if(ack.isUnmatchedAck())
>                   log "Unmatched";
>           else if(ack.isStandardAck())
>                   log "Standard";
>           else
>                   log "Unknown AckType: " + ack.getAckType();
>     …
>
> Client:
>                 Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>                 MessageConsumer consumer =
> session.createConsumer(session.createQueue(“QUEUE.IN"));
>                 TextMessage message = (TextMessage)consumer.receive(2000);
>
>                 while(message != null) {
>                         logger.info("--- [testTransactedReceiveRollback]
> Receiving test message : " + message.getText() + " JMS Message ID: " +
> message.getJMSMessageID());
>                         message = (TextMessage)consumer.receive(2000);
>                 }
>                 session.rollback();
>                 closeResources(connection, session, null, consumer);
>           ….
>
> Thanks,
> Matt Pavlovich | Founding Partner | Media Driver
> 512.284.4330 | matt.pavlovich@mediadriver.com
> 810 Hesters Crossing, Suite 165, Round Rock, TX 78681
>
>


-- 
http://redhat.com
http://blog.garytully.com