You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Xander Uiterlinden <ui...@gmail.com> on 2014/10/16 19:45:13 UTC

redelivery counter not increased higher than one causing indefinite redelivery

Hi,

I'm trying to implement a JMS consumer that uses XA transactions in order
to create a consumer that consumes the message and does work with the
message in a single transaction.
The code for the consuming worker is as follows:

 class Worker implements Runnable {

  public void run() {

 ActiveMQXAConnectionFactory fact = new
ActiveMQXAConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);

 RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();

 redeliveryPolicy.setInitialRedeliveryDelay(500);

 redeliveryPolicy.setBackOffMultiplier(2);

 redeliveryPolicy.setUseExponentialBackOff(true);

 redeliveryPolicy.setMaximumRedeliveries(2);

 fact.setRedeliveryPolicy(redeliveryPolicy);

 pooledConnectionFactory = new XaPooledConnectionFactory();

 pooledConnectionFactory.setConnectionFactory(fact);

 pooledConnectionFactory.setTransactionManager(transactionManager);

   while (keepProcessing()) {

  TopicSession session = null;

  TopicSubscriber consumer = null;

  TopicConnection connection = null;

  try {

  transactionManager.begin();

  connection = pooledConnectionFactory.createTopicConnection();

  connection.setClientID(this.getClass().getName());

  System.out.println("Client ID: " + connection.getClientID());

  session = connection.createTopicSession(false, Session.SESSION_TRANSACTED
);

  Topic destination = session.createTopic(topic);

  consumer = session.createDurableSubscriber(destination, agent
.getClass().getName());

         connection.start();

         Message message = consumer.receive(5000);

         if (message != null) {

         System.out.println("Received message: " + message);

         throw new RuntimeException("Intentional exception.");

         }

         transactionManager.commit();

         consumer.close();

         session.close();

         connection.close();

  } catch (Exception e) {

  System.out.println("Rolling back transaction.");

  try {

   transactionManager.rollback();

  } catch (Exception ex) {

   ex.printStackTrace();

  }

  e.printStackTrace();

  } finally {

  try { consumer.close(); } catch (Exception ex) {};

  try { session.close(); } catch (Exception ex) {};

  try { connection.close(); } catch (Exception ex) {};

  }



 }

 System.out.println("processing stopped.");

 }


 private boolean keepProcessing() {

 return keepProcessing.get();

 }

   }



Regarding the transactions it's all working fine, however with respect to
message redelivery there's an issue. It gets redelivered infinitely.
Inspecting the code shows that the redelivery counter of the incoming
message is never higher than one. What could be the issue here?

Thanks,

Xander

Re: redelivery counter not increased higher than one causing indefinite redelivery

Posted by Xander Uiterlinden <ui...@gmail.com>.
The counter is managed on the server I presume, so I don't expect it to be
part of the transaction.

What I'm wondering though is why even after a consumer.close(),
session.close(), connection.close() the consumer stays connected to the
server ?

On Thu, Oct 16, 2014 at 8:26 PM, David Jencks <
david_jencks@yahoo.com.invalid> wrote:

> Maybe the redelivery counter is transactional so if the xa tx rolls back
> and redelivery is needed the counter gets reset?  this is speculation but
> might explain your result.
>
> david jencks
>
> On Oct 16, 2014, at 10:45 AM, Xander Uiterlinden <ui...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm trying to implement a JMS consumer that uses XA transactions in order
> > to create a consumer that consumes the message and does work with the
> > message in a single transaction.
> > The code for the consuming worker is as follows:
> >
> > class Worker implements Runnable {
> >
> >  public void run() {
> >
> > ActiveMQXAConnectionFactory fact = new
> > ActiveMQXAConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> >
> > RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> >
> > redeliveryPolicy.setInitialRedeliveryDelay(500);
> >
> > redeliveryPolicy.setBackOffMultiplier(2);
> >
> > redeliveryPolicy.setUseExponentialBackOff(true);
> >
> > redeliveryPolicy.setMaximumRedeliveries(2);
> >
> > fact.setRedeliveryPolicy(redeliveryPolicy);
> >
> > pooledConnectionFactory = new XaPooledConnectionFactory();
> >
> > pooledConnectionFactory.setConnectionFactory(fact);
> >
> > pooledConnectionFactory.setTransactionManager(transactionManager);
> >
> >   while (keepProcessing()) {
> >
> >  TopicSession session = null;
> >
> >  TopicSubscriber consumer = null;
> >
> >  TopicConnection connection = null;
> >
> >  try {
> >
> >  transactionManager.begin();
> >
> >  connection = pooledConnectionFactory.createTopicConnection();
> >
> >  connection.setClientID(this.getClass().getName());
> >
> >  System.out.println("Client ID: " + connection.getClientID());
> >
> >  session = connection.createTopicSession(false,
> Session.SESSION_TRANSACTED
> > );
> >
> >  Topic destination = session.createTopic(topic);
> >
> >  consumer = session.createDurableSubscriber(destination, agent
> > .getClass().getName());
> >
> >         connection.start();
> >
> >         Message message = consumer.receive(5000);
> >
> >         if (message != null) {
> >
> >         System.out.println("Received message: " + message);
> >
> >         throw new RuntimeException("Intentional exception.");
> >
> >         }
> >
> >         transactionManager.commit();
> >
> >         consumer.close();
> >
> >         session.close();
> >
> >         connection.close();
> >
> >  } catch (Exception e) {
> >
> >  System.out.println("Rolling back transaction.");
> >
> >  try {
> >
> >   transactionManager.rollback();
> >
> >  } catch (Exception ex) {
> >
> >   ex.printStackTrace();
> >
> >  }
> >
> >  e.printStackTrace();
> >
> >  } finally {
> >
> >  try { consumer.close(); } catch (Exception ex) {};
> >
> >  try { session.close(); } catch (Exception ex) {};
> >
> >  try { connection.close(); } catch (Exception ex) {};
> >
> >  }
> >
> >
> >
> > }
> >
> > System.out.println("processing stopped.");
> >
> > }
> >
> >
> > private boolean keepProcessing() {
> >
> > return keepProcessing.get();
> >
> > }
> >
> >   }
> >
> >
> >
> > Regarding the transactions it's all working fine, however with respect to
> > message redelivery there's an issue. It gets redelivered infinitely.
> > Inspecting the code shows that the redelivery counter of the incoming
> > message is never higher than one. What could be the issue here?
> >
> > Thanks,
> >
> > Xander
>
>

Re: redelivery counter not increased higher than one causing indefinite redelivery

Posted by David Jencks <da...@yahoo.com.INVALID>.
Maybe the redelivery counter is transactional so if the xa tx rolls back and redelivery is needed the counter gets reset?  this is speculation but might explain your result.

david jencks

On Oct 16, 2014, at 10:45 AM, Xander Uiterlinden <ui...@gmail.com> wrote:

> Hi,
> 
> I'm trying to implement a JMS consumer that uses XA transactions in order
> to create a consumer that consumes the message and does work with the
> message in a single transaction.
> The code for the consuming worker is as follows:
> 
> class Worker implements Runnable {
> 
>  public void run() {
> 
> ActiveMQXAConnectionFactory fact = new
> ActiveMQXAConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
> 
> RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
> 
> redeliveryPolicy.setInitialRedeliveryDelay(500);
> 
> redeliveryPolicy.setBackOffMultiplier(2);
> 
> redeliveryPolicy.setUseExponentialBackOff(true);
> 
> redeliveryPolicy.setMaximumRedeliveries(2);
> 
> fact.setRedeliveryPolicy(redeliveryPolicy);
> 
> pooledConnectionFactory = new XaPooledConnectionFactory();
> 
> pooledConnectionFactory.setConnectionFactory(fact);
> 
> pooledConnectionFactory.setTransactionManager(transactionManager);
> 
>   while (keepProcessing()) {
> 
>  TopicSession session = null;
> 
>  TopicSubscriber consumer = null;
> 
>  TopicConnection connection = null;
> 
>  try {
> 
>  transactionManager.begin();
> 
>  connection = pooledConnectionFactory.createTopicConnection();
> 
>  connection.setClientID(this.getClass().getName());
> 
>  System.out.println("Client ID: " + connection.getClientID());
> 
>  session = connection.createTopicSession(false, Session.SESSION_TRANSACTED
> );
> 
>  Topic destination = session.createTopic(topic);
> 
>  consumer = session.createDurableSubscriber(destination, agent
> .getClass().getName());
> 
>         connection.start();
> 
>         Message message = consumer.receive(5000);
> 
>         if (message != null) {
> 
>         System.out.println("Received message: " + message);
> 
>         throw new RuntimeException("Intentional exception.");
> 
>         }
> 
>         transactionManager.commit();
> 
>         consumer.close();
> 
>         session.close();
> 
>         connection.close();
> 
>  } catch (Exception e) {
> 
>  System.out.println("Rolling back transaction.");
> 
>  try {
> 
>   transactionManager.rollback();
> 
>  } catch (Exception ex) {
> 
>   ex.printStackTrace();
> 
>  }
> 
>  e.printStackTrace();
> 
>  } finally {
> 
>  try { consumer.close(); } catch (Exception ex) {};
> 
>  try { session.close(); } catch (Exception ex) {};
> 
>  try { connection.close(); } catch (Exception ex) {};
> 
>  }
> 
> 
> 
> }
> 
> System.out.println("processing stopped.");
> 
> }
> 
> 
> private boolean keepProcessing() {
> 
> return keepProcessing.get();
> 
> }
> 
>   }
> 
> 
> 
> Regarding the transactions it's all working fine, however with respect to
> message redelivery there's an issue. It gets redelivered infinitely.
> Inspecting the code shows that the redelivery counter of the incoming
> message is never higher than one. What could be the issue here?
> 
> Thanks,
> 
> Xander