You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Tarun Kumar <ag...@gmail.com> on 2013/12/20 05:25:25 UTC

polling consumer with acknowledgement

I am using polling consumer.

from("timer://foo?period=5000").bean(cool, "someBusinessLogic");

public static class MyCoolBean {

  private ConsumerTemplate consumer;

   public void setConsumer(ConsumerTemplate consumer) {
        this.consumer = consumer;
    }

   public void someBusinessLogic() {

      Exchange exchange
= consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");

}


Even though i am not sending acknowledgement for received message, each
time it is polling different message. Any idea why that's happening?

Re: polling consumer with acknowledgement

Posted by Tarun Kumar <ag...@gmail.com>.
Thanks Kraythe! It worked.


On Tue, Dec 24, 2013 at 3:20 AM, kraythe . <kr...@gmail.com> wrote:

> Ewww... I hate spring xml DSL (fluent Java or Scala is much better) ..
> anyway after my eyes are done bleeding ... it seems like you are wrapping
> the "possible failure" calls in a doTry/catch. Since you are handling the
> transactions, camel assumes all is well and that you meant to do that, so
> an exception wont be propagated and the transaction wont be rolled back. If
> you want to have a transaction be rolled back you have to let the exception
> propagate while you have a transaction error handler in scope or
> setRollbackOnly() on the transaction when you catch the exception: Such as:
>
> from(endpointAMQ(QUEUE_DB_MONITOR_EVENT)) // Monitor the event queue
>         .routeId(ROUTE_ID_AUTOMATION_CASES_DB_PROCESSOR) // give the route
> an ID
>
> .onException(Exception.class).useOriginalMessage().handled(true).maximumRedeliveries(0)
> // catch exceptions
>         .setHeader(Exchange.FAILURE_ROUTE_ID, property(Exchange.
> FAILURE_ROUTE_ID)) // set route that errored
>         .setHeader(Exchange.EXCEPTION_CAUGHT, simple(
> "${exception.stacktrace}")) // Store reason for error
>         .to(ExchangePattern.InOnly,
> endpointAMQ(QUEUE_CASE_AUTOMATION_DEAD)).end()
> // to DLQ
>         .transacted(KEY_TXPOLICY_REQUIRED) // make the route transacted
>         // rest of route
>
> As you can see I store the exception stack trace and other information in
> the headers of the failed exchange, send the exchange to the dead letter
> queue and then so on. So if there is an exception in this route, NOTHING
> will get rolled back which is what I want since I cant reprocess the
> message. So why transacted? If someone pulls the plug on the machine or
> uses kill -9 on the process, I want AMQ to retain the message. Now if I
> wanted a rollback I could add the call .rollback() right after my
> onException()
> call and before the .end call. That would tell the exception error handler
> (which is a transaction handler) to rollback AMQ.
>
> Now if you want to rollback both a database and AMQ transaction you will
> need a JTA transaction manager.
>
>
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Mon, Dec 23, 2013 at 2:11 AM, Tarun Kumar <agrawal.tarun23@gmail.com
> >wrote:
>
> > Hi Kraythe,
> >
> > Thanks for replying.
> >
> > Here is how my route looks like:
> >
> > <camel:route id="createMigrationRoute">
> >
> >  <camel:from uri="timer://foo?period=1000"></camel:from>
> >
> >  <camel:transacted>
> >
> >  <camel:doTry>
> >
> >   <camel:bean ref="abc" method="fetchMessage"></camel:bean>
> >
> > <camel:bean ref="pqr" method="processMessage"></camel:bean>
> >
> > <camel:doCatch>
> >
> >   <camel:exception>java.lang.IOException</camel:exception>
> >
> >   <camel:bean ref="def" method="handleException" />
> >
> >   </camel:doCatch>
> >
> >   <camel:doFinally>
> >
> >   <camel:bean ref="dsd" method="me2"></camel:bean>
> >
> >   </camel:doFinally>
> >
> >  </camel:doTry>
> >
> >  </camel:transacted>
> >
> >  </camel:route>
> >
> >
> > And beans looks like this:
> >
> >
> >  <bean id="catalogJms"
> class="org.apache.camel.component.jms.JmsComponent">
> >
> >  <property name="connectionFactory">
> >
> >  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
> >
> >   <property name="brokerURL">
> >
> >   <value>tcp://localhost:61616</value>
> >
> >   </property>
> >
> >  </bean>
> >
> >  </property>
> >
> >  <property name="transacted" value="true"></property>
> >
> >  <property name="transactionManager" ref="jmsTransactionManager" />
> >
> >  <property name="acknowledgementModeName">
> >
> >  <value>CLIENT_ACKNOWLEDGE</value>
> >
> >  </property>
> >
> > </bean>
> >
> >
> > <bean id="poolConnectionFactory" class=
> > "org.apache.activemq.pool.PooledConnectionFactory">
> >
> >  <property name="maxConnections" value="8" />
> >
> >  <property name="connectionFactory" ref="jmsConnectionFactory" />
> >
> > </bean>
> >
> >
> >  <bean id="jmsConnectionFactory" class=
> > "org.apache.activemq.ActiveMQConnectionFactory">
> >
> >  <property name="brokerURL"
> >
> >  value=
> > "vm://localhost:61616?broker.persistent=false&amp;broker.useJmx=false" />
> >
> > </bean>
> >
> >
> >  <!-- setup spring jms TX manager -->
> >
> > <bean id="jmsTransactionManager"
> >
> >  class="org.springframework.jms.connection.JmsTransactionManager">
> >
> >  <property name="connectionFactory" ref="poolConnectionFactory" />
> >
> > </bean>
> >
> >
> > And fetchMessage in abc bean class looks like:
> >
> >
> > public String fetchMessage() {
> >
> >  Exchange exchange = consumer
> >
> >   .receive(
> >
> >
> "catalogJms:queue:queue1?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE&transacted=true",
> > 5000);
> >
> > return exchange.getIn().getBody(TextMessage.class).toString();
> >
> > }
> >
> >
> > It still consumes message (QueueSize decreases by one) even when it fails
> > in pqr bean with any exception apart from IOException.
> >
> > Please let me know what am i missing here?
> >
> >
> >
> >
> > On Sat, Dec 21, 2013 at 10:01 PM, kraythe . <kr...@gmail.com> wrote:
> >
> > > In that case you are going about it wrong. Camel has this support built
> > in
> > > for you. If the route is declared to be transacted with
> > > .transacted(myRequirdPolicy) then camel will handle that all for you
> with
> > > that one line. Put the transacted line right after your exception
> > handleing
> > > and before the main route and then go about your business. If you want
> a
> > > composite transaction that rolls back both the DB and JMS work then you
> > > will need to configure a jta transaction manager. Just google 'camel
> > > jta transacted route' and you will get code examples and so on.
> > >
> > > On Saturday, December 21, 2013, Tarun Kumar wrote:
> > >
> > > > Thanks for replying. Here is my usecase:
> > > >
> > > > consume a message from the queue. Do some transformation on the
> messge.
> > > > Then, persist the transformed message to datastore.
> > > > Once, transformed message is written to datastore, send ack. Reason i
> > > want
> > > > to send ack later is, in case my application goes down post message
> > > > consumption and before writing to datastore, i should be able to
> fetch
> > > same
> > > > message again from JMS queue. Hope that clarifies.
> > > >
> > > > Thanks!
> > > >
> > > >
> > > >
> > > > On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kraythe@gmail.com
> > > <javascript:;>>
> > > > wrote:
> > > >
> > > > > I don't think the question is quite clear. In JMS you can only
> > consume
> > > a
> > > > > message off a queue once. You can't consume it and leave it on the
> > > queue.
> > > > > Why do want to not ack the message? Queues are like throwing candy
> > > into a
> > > > > room of kindergarten children. All will scramble for the candy but
> > each
> > > > > piece will get consumed by only one child. The only time JMS
> messages
> > > > > aren't caked is when they are failed in delivery. And even then,
> the
> > > only
> > > > > use case for leaving them on the queue is because of a failed
> > > transaction
> > > > > in a transactional route,
> > > > >
> > > > > Tell me what is the use case you are trying to implement and I
> might
> > be
> > > > > able to help you down another path.
> > > > >
> > > > > On Saturday, December 21, 2013, Tarun Kumar wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Any help here will be highly appreciated.
> > > > > >
> > > > > >
> > > > > > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <
> > > > agrawal.tarun23@gmail.com <javascript:;>
> > > > > <javascript:;>
> > > > > > >wrote:
> > > > > >
> > > > > > > I am using polling consumer.
> > > > > > >
> > > > > > > from("timer://foo?period=5000").bean(cool,
> "someBusinessLogic");
> > > > > > >
> > > > > > > public static class MyCoolBean {
> > > > > > >
> > > > > > >   private ConsumerTemplate consumer;
> > > > > > >
> > > > > > >    public void setConsumer(ConsumerTemplate consumer) {
> > > > > > >         this.consumer = consumer;
> > > > > > >     }
> > > > > > >
> > > > > > >    public void someBusinessLogic() {
> > > > > > >
> > > > > > >       Exchange exchange
> > > > > > > =
> > > > > >
> > > > >
> > > >
> > >
> >
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> > > > > > >
> > > > > > > }
> > > > > > >
> > > > > > >
> > > > > > > Even though i am not sending acknowledgement for received
> > message,
> > > > each
> > > > > > > time it is polling different message. Any idea why that's
> > > happening?
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > > > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > > > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > > > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> > > > >
> > > >
> > >
> > >
> > > --
> > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> > >
> >
>

Re: polling consumer with acknowledgement

Posted by "kraythe ." <kr...@gmail.com>.
Ewww... I hate spring xml DSL (fluent Java or Scala is much better) ..
anyway after my eyes are done bleeding ... it seems like you are wrapping
the "possible failure" calls in a doTry/catch. Since you are handling the
transactions, camel assumes all is well and that you meant to do that, so
an exception wont be propagated and the transaction wont be rolled back. If
you want to have a transaction be rolled back you have to let the exception
propagate while you have a transaction error handler in scope or
setRollbackOnly() on the transaction when you catch the exception: Such as:

from(endpointAMQ(QUEUE_DB_MONITOR_EVENT)) // Monitor the event queue
        .routeId(ROUTE_ID_AUTOMATION_CASES_DB_PROCESSOR) // give the route
an ID
        .onException(Exception.class).useOriginalMessage().handled(true).maximumRedeliveries(0)
// catch exceptions
        .setHeader(Exchange.FAILURE_ROUTE_ID, property(Exchange.
FAILURE_ROUTE_ID)) // set route that errored
        .setHeader(Exchange.EXCEPTION_CAUGHT, simple(
"${exception.stacktrace}")) // Store reason for error
        .to(ExchangePattern.InOnly,
endpointAMQ(QUEUE_CASE_AUTOMATION_DEAD)).end()
// to DLQ
        .transacted(KEY_TXPOLICY_REQUIRED) // make the route transacted
        // rest of route

As you can see I store the exception stack trace and other information in
the headers of the failed exchange, send the exchange to the dead letter
queue and then so on. So if there is an exception in this route, NOTHING
will get rolled back which is what I want since I cant reprocess the
message. So why transacted? If someone pulls the plug on the machine or
uses kill -9 on the process, I want AMQ to retain the message. Now if I
wanted a rollback I could add the call .rollback() right after my onException()
call and before the .end call. That would tell the exception error handler
(which is a transaction handler) to rollback AMQ.

Now if you want to rollback both a database and AMQ transaction you will
need a JTA transaction manager.



*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Mon, Dec 23, 2013 at 2:11 AM, Tarun Kumar <ag...@gmail.com>wrote:

> Hi Kraythe,
>
> Thanks for replying.
>
> Here is how my route looks like:
>
> <camel:route id="createMigrationRoute">
>
>  <camel:from uri="timer://foo?period=1000"></camel:from>
>
>  <camel:transacted>
>
>  <camel:doTry>
>
>   <camel:bean ref="abc" method="fetchMessage"></camel:bean>
>
> <camel:bean ref="pqr" method="processMessage"></camel:bean>
>
> <camel:doCatch>
>
>   <camel:exception>java.lang.IOException</camel:exception>
>
>   <camel:bean ref="def" method="handleException" />
>
>   </camel:doCatch>
>
>   <camel:doFinally>
>
>   <camel:bean ref="dsd" method="me2"></camel:bean>
>
>   </camel:doFinally>
>
>  </camel:doTry>
>
>  </camel:transacted>
>
>  </camel:route>
>
>
> And beans looks like this:
>
>
>  <bean id="catalogJms" class="org.apache.camel.component.jms.JmsComponent">
>
>  <property name="connectionFactory">
>
>  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
>
>   <property name="brokerURL">
>
>   <value>tcp://localhost:61616</value>
>
>   </property>
>
>  </bean>
>
>  </property>
>
>  <property name="transacted" value="true"></property>
>
>  <property name="transactionManager" ref="jmsTransactionManager" />
>
>  <property name="acknowledgementModeName">
>
>  <value>CLIENT_ACKNOWLEDGE</value>
>
>  </property>
>
> </bean>
>
>
> <bean id="poolConnectionFactory" class=
> "org.apache.activemq.pool.PooledConnectionFactory">
>
>  <property name="maxConnections" value="8" />
>
>  <property name="connectionFactory" ref="jmsConnectionFactory" />
>
> </bean>
>
>
>  <bean id="jmsConnectionFactory" class=
> "org.apache.activemq.ActiveMQConnectionFactory">
>
>  <property name="brokerURL"
>
>  value=
> "vm://localhost:61616?broker.persistent=false&amp;broker.useJmx=false" />
>
> </bean>
>
>
>  <!-- setup spring jms TX manager -->
>
> <bean id="jmsTransactionManager"
>
>  class="org.springframework.jms.connection.JmsTransactionManager">
>
>  <property name="connectionFactory" ref="poolConnectionFactory" />
>
> </bean>
>
>
> And fetchMessage in abc bean class looks like:
>
>
> public String fetchMessage() {
>
>  Exchange exchange = consumer
>
>   .receive(
>
> "catalogJms:queue:queue1?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE&transacted=true",
> 5000);
>
> return exchange.getIn().getBody(TextMessage.class).toString();
>
> }
>
>
> It still consumes message (QueueSize decreases by one) even when it fails
> in pqr bean with any exception apart from IOException.
>
> Please let me know what am i missing here?
>
>
>
>
> On Sat, Dec 21, 2013 at 10:01 PM, kraythe . <kr...@gmail.com> wrote:
>
> > In that case you are going about it wrong. Camel has this support built
> in
> > for you. If the route is declared to be transacted with
> > .transacted(myRequirdPolicy) then camel will handle that all for you with
> > that one line. Put the transacted line right after your exception
> handleing
> > and before the main route and then go about your business. If you want a
> > composite transaction that rolls back both the DB and JMS work then you
> > will need to configure a jta transaction manager. Just google 'camel
> > jta transacted route' and you will get code examples and so on.
> >
> > On Saturday, December 21, 2013, Tarun Kumar wrote:
> >
> > > Thanks for replying. Here is my usecase:
> > >
> > > consume a message from the queue. Do some transformation on the messge.
> > > Then, persist the transformed message to datastore.
> > > Once, transformed message is written to datastore, send ack. Reason i
> > want
> > > to send ack later is, in case my application goes down post message
> > > consumption and before writing to datastore, i should be able to fetch
> > same
> > > message again from JMS queue. Hope that clarifies.
> > >
> > > Thanks!
> > >
> > >
> > >
> > > On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kraythe@gmail.com
> > <javascript:;>>
> > > wrote:
> > >
> > > > I don't think the question is quite clear. In JMS you can only
> consume
> > a
> > > > message off a queue once. You can't consume it and leave it on the
> > queue.
> > > > Why do want to not ack the message? Queues are like throwing candy
> > into a
> > > > room of kindergarten children. All will scramble for the candy but
> each
> > > > piece will get consumed by only one child. The only time JMS messages
> > > > aren't caked is when they are failed in delivery. And even then, the
> > only
> > > > use case for leaving them on the queue is because of a failed
> > transaction
> > > > in a transactional route,
> > > >
> > > > Tell me what is the use case you are trying to implement and I might
> be
> > > > able to help you down another path.
> > > >
> > > > On Saturday, December 21, 2013, Tarun Kumar wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Any help here will be highly appreciated.
> > > > >
> > > > >
> > > > > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <
> > > agrawal.tarun23@gmail.com <javascript:;>
> > > > <javascript:;>
> > > > > >wrote:
> > > > >
> > > > > > I am using polling consumer.
> > > > > >
> > > > > > from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
> > > > > >
> > > > > > public static class MyCoolBean {
> > > > > >
> > > > > >   private ConsumerTemplate consumer;
> > > > > >
> > > > > >    public void setConsumer(ConsumerTemplate consumer) {
> > > > > >         this.consumer = consumer;
> > > > > >     }
> > > > > >
> > > > > >    public void someBusinessLogic() {
> > > > > >
> > > > > >       Exchange exchange
> > > > > > =
> > > > >
> > > >
> > >
> >
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> > > > > >
> > > > > > }
> > > > > >
> > > > > >
> > > > > > Even though i am not sending acknowledgement for received
> message,
> > > each
> > > > > > time it is polling different message. Any idea why that's
> > happening?
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> > > >
> > >
> >
> >
> > --
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
>

Re: polling consumer with acknowledgement

Posted by Tarun Kumar <ag...@gmail.com>.
Hi Kraythe,

Thanks for replying.

Here is how my route looks like:

<camel:route id="createMigrationRoute">

 <camel:from uri="timer://foo?period=1000"></camel:from>

 <camel:transacted>

 <camel:doTry>

  <camel:bean ref="abc" method="fetchMessage"></camel:bean>

<camel:bean ref="pqr" method="processMessage"></camel:bean>

<camel:doCatch>

  <camel:exception>java.lang.IOException</camel:exception>

  <camel:bean ref="def" method="handleException" />

  </camel:doCatch>

  <camel:doFinally>

  <camel:bean ref="dsd" method="me2"></camel:bean>

  </camel:doFinally>

 </camel:doTry>

 </camel:transacted>

 </camel:route>


And beans looks like this:


 <bean id="catalogJms" class="org.apache.camel.component.jms.JmsComponent">

 <property name="connectionFactory">

 <bean class="org.apache.activemq.ActiveMQConnectionFactory">

  <property name="brokerURL">

  <value>tcp://localhost:61616</value>

  </property>

 </bean>

 </property>

 <property name="transacted" value="true"></property>

 <property name="transactionManager" ref="jmsTransactionManager" />

 <property name="acknowledgementModeName">

 <value>CLIENT_ACKNOWLEDGE</value>

 </property>

</bean>


<bean id="poolConnectionFactory" class=
"org.apache.activemq.pool.PooledConnectionFactory">

 <property name="maxConnections" value="8" />

 <property name="connectionFactory" ref="jmsConnectionFactory" />

</bean>


 <bean id="jmsConnectionFactory" class=
"org.apache.activemq.ActiveMQConnectionFactory">

 <property name="brokerURL"

 value=
"vm://localhost:61616?broker.persistent=false&amp;broker.useJmx=false" />

</bean>


 <!-- setup spring jms TX manager -->

<bean id="jmsTransactionManager"

 class="org.springframework.jms.connection.JmsTransactionManager">

 <property name="connectionFactory" ref="poolConnectionFactory" />

</bean>


And fetchMessage in abc bean class looks like:


public String fetchMessage() {

 Exchange exchange = consumer

  .receive(
"catalogJms:queue:queue1?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE&transacted=true",
5000);

return exchange.getIn().getBody(TextMessage.class).toString();

}


It still consumes message (QueueSize decreases by one) even when it fails
in pqr bean with any exception apart from IOException.

Please let me know what am i missing here?




On Sat, Dec 21, 2013 at 10:01 PM, kraythe . <kr...@gmail.com> wrote:

> In that case you are going about it wrong. Camel has this support built in
> for you. If the route is declared to be transacted with
> .transacted(myRequirdPolicy) then camel will handle that all for you with
> that one line. Put the transacted line right after your exception handleing
> and before the main route and then go about your business. If you want a
> composite transaction that rolls back both the DB and JMS work then you
> will need to configure a jta transaction manager. Just google 'camel
> jta transacted route' and you will get code examples and so on.
>
> On Saturday, December 21, 2013, Tarun Kumar wrote:
>
> > Thanks for replying. Here is my usecase:
> >
> > consume a message from the queue. Do some transformation on the messge.
> > Then, persist the transformed message to datastore.
> > Once, transformed message is written to datastore, send ack. Reason i
> want
> > to send ack later is, in case my application goes down post message
> > consumption and before writing to datastore, i should be able to fetch
> same
> > message again from JMS queue. Hope that clarifies.
> >
> > Thanks!
> >
> >
> >
> > On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kraythe@gmail.com
> <javascript:;>>
> > wrote:
> >
> > > I don't think the question is quite clear. In JMS you can only consume
> a
> > > message off a queue once. You can't consume it and leave it on the
> queue.
> > > Why do want to not ack the message? Queues are like throwing candy
> into a
> > > room of kindergarten children. All will scramble for the candy but each
> > > piece will get consumed by only one child. The only time JMS messages
> > > aren't caked is when they are failed in delivery. And even then, the
> only
> > > use case for leaving them on the queue is because of a failed
> transaction
> > > in a transactional route,
> > >
> > > Tell me what is the use case you are trying to implement and I might be
> > > able to help you down another path.
> > >
> > > On Saturday, December 21, 2013, Tarun Kumar wrote:
> > >
> > > > Hi,
> > > >
> > > > Any help here will be highly appreciated.
> > > >
> > > >
> > > > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <
> > agrawal.tarun23@gmail.com <javascript:;>
> > > <javascript:;>
> > > > >wrote:
> > > >
> > > > > I am using polling consumer.
> > > > >
> > > > > from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
> > > > >
> > > > > public static class MyCoolBean {
> > > > >
> > > > >   private ConsumerTemplate consumer;
> > > > >
> > > > >    public void setConsumer(ConsumerTemplate consumer) {
> > > > >         this.consumer = consumer;
> > > > >     }
> > > > >
> > > > >    public void someBusinessLogic() {
> > > > >
> > > > >       Exchange exchange
> > > > > =
> > > >
> > >
> >
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > > Even though i am not sending acknowledgement for received message,
> > each
> > > > > time it is polling different message. Any idea why that's
> happening?
> > > > >
> > > >
> > >
> > >
> > > --
> > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> > >
> >
>
>
> --
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>

Re: polling consumer with acknowledgement

Posted by "kraythe ." <kr...@gmail.com>.
In that case you are going about it wrong. Camel has this support built in
for you. If the route is declared to be transacted with
.transacted(myRequirdPolicy) then camel will handle that all for you with
that one line. Put the transacted line right after your exception handleing
and before the main route and then go about your business. If you want a
composite transaction that rolls back both the DB and JMS work then you
will need to configure a jta transaction manager. Just google 'camel
jta transacted route' and you will get code examples and so on.

On Saturday, December 21, 2013, Tarun Kumar wrote:

> Thanks for replying. Here is my usecase:
>
> consume a message from the queue. Do some transformation on the messge.
> Then, persist the transformed message to datastore.
> Once, transformed message is written to datastore, send ack. Reason i want
> to send ack later is, in case my application goes down post message
> consumption and before writing to datastore, i should be able to fetch same
> message again from JMS queue. Hope that clarifies.
>
> Thanks!
>
>
>
> On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kraythe@gmail.com<javascript:;>>
> wrote:
>
> > I don't think the question is quite clear. In JMS you can only consume a
> > message off a queue once. You can't consume it and leave it on the queue.
> > Why do want to not ack the message? Queues are like throwing candy into a
> > room of kindergarten children. All will scramble for the candy but each
> > piece will get consumed by only one child. The only time JMS messages
> > aren't caked is when they are failed in delivery. And even then, the only
> > use case for leaving them on the queue is because of a failed transaction
> > in a transactional route,
> >
> > Tell me what is the use case you are trying to implement and I might be
> > able to help you down another path.
> >
> > On Saturday, December 21, 2013, Tarun Kumar wrote:
> >
> > > Hi,
> > >
> > > Any help here will be highly appreciated.
> > >
> > >
> > > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <
> agrawal.tarun23@gmail.com <javascript:;>
> > <javascript:;>
> > > >wrote:
> > >
> > > > I am using polling consumer.
> > > >
> > > > from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
> > > >
> > > > public static class MyCoolBean {
> > > >
> > > >   private ConsumerTemplate consumer;
> > > >
> > > >    public void setConsumer(ConsumerTemplate consumer) {
> > > >         this.consumer = consumer;
> > > >     }
> > > >
> > > >    public void someBusinessLogic() {
> > > >
> > > >       Exchange exchange
> > > > =
> > >
> >
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> > > >
> > > > }
> > > >
> > > >
> > > > Even though i am not sending acknowledgement for received message,
> each
> > > > time it is polling different message. Any idea why that's happening?
> > > >
> > >
> >
> >
> > --
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
>


-- 
*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Re: polling consumer with acknowledgement

Posted by Tarun Kumar <ag...@gmail.com>.
Thanks for replying. Here is my usecase:

consume a message from the queue. Do some transformation on the messge.
Then, persist the transformed message to datastore.
Once, transformed message is written to datastore, send ack. Reason i want
to send ack later is, in case my application goes down post message
consumption and before writing to datastore, i should be able to fetch same
message again from JMS queue. Hope that clarifies.

Thanks!



On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kr...@gmail.com> wrote:

> I don't think the question is quite clear. In JMS you can only consume a
> message off a queue once. You can't consume it and leave it on the queue.
> Why do want to not ack the message? Queues are like throwing candy into a
> room of kindergarten children. All will scramble for the candy but each
> piece will get consumed by only one child. The only time JMS messages
> aren't caked is when they are failed in delivery. And even then, the only
> use case for leaving them on the queue is because of a failed transaction
> in a transactional route,
>
> Tell me what is the use case you are trying to implement and I might be
> able to help you down another path.
>
> On Saturday, December 21, 2013, Tarun Kumar wrote:
>
> > Hi,
> >
> > Any help here will be highly appreciated.
> >
> >
> > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <agrawal.tarun23@gmail.com
> <javascript:;>
> > >wrote:
> >
> > > I am using polling consumer.
> > >
> > > from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
> > >
> > > public static class MyCoolBean {
> > >
> > >   private ConsumerTemplate consumer;
> > >
> > >    public void setConsumer(ConsumerTemplate consumer) {
> > >         this.consumer = consumer;
> > >     }
> > >
> > >    public void someBusinessLogic() {
> > >
> > >       Exchange exchange
> > > =
> >
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> > >
> > > }
> > >
> > >
> > > Even though i am not sending acknowledgement for received message, each
> > > time it is polling different message. Any idea why that's happening?
> > >
> >
>
>
> --
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>

Re: polling consumer with acknowledgement

Posted by "kraythe ." <kr...@gmail.com>.
I don't think the question is quite clear. In JMS you can only consume a
message off a queue once. You can't consume it and leave it on the queue.
Why do want to not ack the message? Queues are like throwing candy into a
room of kindergarten children. All will scramble for the candy but each
piece will get consumed by only one child. The only time JMS messages
aren't caked is when they are failed in delivery. And even then, the only
use case for leaving them on the queue is because of a failed transaction
in a transactional route,

Tell me what is the use case you are trying to implement and I might be
able to help you down another path.

On Saturday, December 21, 2013, Tarun Kumar wrote:

> Hi,
>
> Any help here will be highly appreciated.
>
>
> On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <agrawal.tarun23@gmail.com<javascript:;>
> >wrote:
>
> > I am using polling consumer.
> >
> > from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
> >
> > public static class MyCoolBean {
> >
> >   private ConsumerTemplate consumer;
> >
> >    public void setConsumer(ConsumerTemplate consumer) {
> >         this.consumer = consumer;
> >     }
> >
> >    public void someBusinessLogic() {
> >
> >       Exchange exchange
> > =
> consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
> >
> > }
> >
> >
> > Even though i am not sending acknowledgement for received message, each
> > time it is polling different message. Any idea why that's happening?
> >
>


-- 
*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Re: polling consumer with acknowledgement

Posted by Tarun Kumar <ag...@gmail.com>.
Hi,

Any help here will be highly appreciated.


On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar <ag...@gmail.com>wrote:

> I am using polling consumer.
>
> from("timer://foo?period=5000").bean(cool, "someBusinessLogic");
>
> public static class MyCoolBean {
>
>   private ConsumerTemplate consumer;
>
>    public void setConsumer(ConsumerTemplate consumer) {
>         this.consumer = consumer;
>     }
>
>    public void someBusinessLogic() {
>
>       Exchange exchange
> = consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE");
>
> }
>
>
> Even though i am not sending acknowledgement for received message, each
> time it is polling different message. Any idea why that's happening?
>