You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "kraythe ." <kr...@gmail.com> on 2014/04/10 18:42:48 UTC

JTA Transactions Rollback of Routes using Directs?

Greetings, I have an interesting use case I was wondering if anyone had
ideas on.

Essentially there is a notifications table in a database that has
essentially the following structure:

CREATE TABLE "etl_case_notification" (
  "process_id" int(11) NOT NULL DEFAULT '0',
  "table_name" varchar(100) NOT NULL,
  "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY ("process_id")
);

The idea is that we check the notifications table periodically (quartz) and
then when there is a record that has a last updated date greater than last
processed date, we read the data in table_name and process the records,
routing them to the appropriate queue and when we have all the records
routed, we update the last_processed date.

The use case has the following goals:
1) If a single record in table_name fails to process, it gets sent to dead
letter queue but no rollback.
2) If there is an error in the update of the last processed time, all
processed records are removed from the queues they are routed to.
3) If any record in table_name fails to get to a queue to be routed, all
records in that event should be aborted and last_processed not updated.

To try to implement this I have the following route (note I have
abbreviated items not relevant to the question.

from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ
    .to("activemq:queue:event"); // send to event queue

from("activemq:queue:event").routeId("processEvent") // from event queue
    .onException(Exception.class).handled(false).to("direct:dead") //
Failures to DLQ + rollback
    .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
transacted
    .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
sql to run.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) //
find the cases from the store
    .split(body()).to("direct:routing").end() // split each case to routing
    .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
cases routed, update last processed.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); //
execute update

from("direct:routing").routeId("routeRecord") // routing of records
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ, No rollback
    .convertBodyTo(Record.class) // could potentially except
    .to("activemq;queue:routing") // to routing tasks


The problem is that the rollbacks are not working. A rollback in a single
record process doesn't cause the whole event to fail but a rollback invoked
for failing to update the database or prior to sending the event for
routing doesn't rollback either. Does anyone have any idea of how this
could be solved? Note that I am sure my transaction management is working
because it works for other routes.

Cross opted to stack overflow:
http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Re: JTA Transactions Rollback of Routes using Directs?

Posted by "kraythe ." <kr...@gmail.com>.
Lovely. I found the problem. Posted the solution on stack overflow for the
consumption of others.

http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs/23066951#23066951

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Mon, Apr 14, 2014 at 9:10 AM, kraythe . <kr...@gmail.com> wrote:

> Yeah I know that the end() is missing. That was a copy paste error, the
> code wasn't the actual route, it had to be trimmed because of proprietary
> information and I trimmed a bit too much. Sorry. However I have found that
> the real problem is that the AMQ connection factory is not being enrolled
> as a participant in the JTA transaction. I have some simple test cases now,
> one with just the Database, one with just AMQ and finally a composite test
> case and in the test with only AMQ the transaction still doesn't roll back
> properly.
>
> I have updated the stack overflow thread with a better sample code snippet
> and my transaction manager setup. Would probably be more efficient if I
> referred you to that:
>
>
> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
>
> Thanks a bunch.
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Fri, Apr 11, 2014 at 4:58 AM, Grzegorz Grzybek <gr...@gmail.com>wrote:
>
>> Hello
>>
>> Your onException clauses are not complete without "end()" and actually you
>> have no real exception handlers (tried this on Camel-2.13.0).
>>
>> regards
>> Grzegorz Grzybek
>>
>>
>> 2014-04-10 18:42 GMT+02:00 kraythe . <kr...@gmail.com>:
>>
>> > Greetings, I have an interesting use case I was wondering if anyone had
>> > ideas on.
>> >
>> > Essentially there is a notifications table in a database that has
>> > essentially the following structure:
>> >
>> > CREATE TABLE "etl_case_notification" (
>> >   "process_id" int(11) NOT NULL DEFAULT '0',
>> >   "table_name" varchar(100) NOT NULL,
>> >   "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
>> >   "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
>> >   PRIMARY KEY ("process_id")
>> > );
>> >
>> > The idea is that we check the notifications table periodically (quartz)
>> and
>> > then when there is a record that has a last updated date greater than
>> last
>> > processed date, we read the data in table_name and process the records,
>> > routing them to the appropriate queue and when we have all the records
>> > routed, we update the last_processed date.
>> >
>> > The use case has the following goals:
>> > 1) If a single record in table_name fails to process, it gets sent to
>> dead
>> > letter queue but no rollback.
>> > 2) If there is an error in the update of the last processed time, all
>> > processed records are removed from the queues they are routed to.
>> > 3) If any record in table_name fails to get to a queue to be routed, all
>> > records in that event should be aborted and last_processed not updated.
>> >
>> > To try to implement this I have the following route (note I have
>> > abbreviated items not relevant to the question.
>> >
>> > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
>> >     .onException(Exception.class).handled(true).to("direct:dead") //
>> > failures to DLQ
>> >     .to("activemq:queue:event"); // send to event queue
>> >
>> > from("activemq:queue:event").routeId("processEvent") // from event queue
>> >     .onException(Exception.class).handled(false).to("direct:dead") //
>> > Failures to DLQ + rollback
>> >     .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
>> > transacted
>> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
>> > sql to run.
>> >     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName())
>> //
>> > find the cases from the store
>> >     .split(body()).to("direct:routing").end() // split each case to
>> routing
>> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications))
>> //
>> > cases routed, update last processed.
>> >     .to("sql:?dataSource=" +
>> this.config.analyticsDatasourceJNDIName()); //
>> > execute update
>> >
>> > from("direct:routing").routeId("routeRecord") // routing of records
>> >     .onException(Exception.class).handled(true).to("direct:dead") //
>> > failures to DLQ, No rollback
>> >     .convertBodyTo(Record.class) // could potentially except
>> >     .to("activemq;queue:routing") // to routing tasks
>> >
>> >
>> > The problem is that the rollbacks are not working. A rollback in a
>> single
>> > record process doesn't cause the whole event to fail but a rollback
>> invoked
>> > for failing to update the database or prior to sending the event for
>> > routing doesn't rollback either. Does anyone have any idea of how this
>> > could be solved? Note that I am sure my transaction management is
>> working
>> > because it works for other routes.
>> >
>> > Cross opted to stack overflow:
>> >
>> >
>> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
>> >
>> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >
>>
>
>

Re: JTA Transactions Rollback of Routes using Directs?

Posted by "kraythe ." <kr...@gmail.com>.
Yeah I know that the end() is missing. That was a copy paste error, the
code wasn't the actual route, it had to be trimmed because of proprietary
information and I trimmed a bit too much. Sorry. However I have found that
the real problem is that the AMQ connection factory is not being enrolled
as a participant in the JTA transaction. I have some simple test cases now,
one with just the Database, one with just AMQ and finally a composite test
case and in the test with only AMQ the transaction still doesn't roll back
properly.

I have updated the stack overflow thread with a better sample code snippet
and my transaction manager setup. Would probably be more efficient if I
referred you to that:

http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs

Thanks a bunch.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Fri, Apr 11, 2014 at 4:58 AM, Grzegorz Grzybek <gr...@gmail.com>wrote:

> Hello
>
> Your onException clauses are not complete without "end()" and actually you
> have no real exception handlers (tried this on Camel-2.13.0).
>
> regards
> Grzegorz Grzybek
>
>
> 2014-04-10 18:42 GMT+02:00 kraythe . <kr...@gmail.com>:
>
> > Greetings, I have an interesting use case I was wondering if anyone had
> > ideas on.
> >
> > Essentially there is a notifications table in a database that has
> > essentially the following structure:
> >
> > CREATE TABLE "etl_case_notification" (
> >   "process_id" int(11) NOT NULL DEFAULT '0',
> >   "table_name" varchar(100) NOT NULL,
> >   "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
> >   "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
> >   PRIMARY KEY ("process_id")
> > );
> >
> > The idea is that we check the notifications table periodically (quartz)
> and
> > then when there is a record that has a last updated date greater than
> last
> > processed date, we read the data in table_name and process the records,
> > routing them to the appropriate queue and when we have all the records
> > routed, we update the last_processed date.
> >
> > The use case has the following goals:
> > 1) If a single record in table_name fails to process, it gets sent to
> dead
> > letter queue but no rollback.
> > 2) If there is an error in the update of the last processed time, all
> > processed records are removed from the queues they are routed to.
> > 3) If any record in table_name fails to get to a queue to be routed, all
> > records in that event should be aborted and last_processed not updated.
> >
> > To try to implement this I have the following route (note I have
> > abbreviated items not relevant to the question.
> >
> > from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
> >     .onException(Exception.class).handled(true).to("direct:dead") //
> > failures to DLQ
> >     .to("activemq:queue:event"); // send to event queue
> >
> > from("activemq:queue:event").routeId("processEvent") // from event queue
> >     .onException(Exception.class).handled(false).to("direct:dead") //
> > Failures to DLQ + rollback
> >     .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
> > transacted
> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
> > sql to run.
> >     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName())
> //
> > find the cases from the store
> >     .split(body()).to("direct:routing").end() // split each case to
> routing
> >     .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
> > cases routed, update last processed.
> >     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName());
> //
> > execute update
> >
> > from("direct:routing").routeId("routeRecord") // routing of records
> >     .onException(Exception.class).handled(true).to("direct:dead") //
> > failures to DLQ, No rollback
> >     .convertBodyTo(Record.class) // could potentially except
> >     .to("activemq;queue:routing") // to routing tasks
> >
> >
> > The problem is that the rollbacks are not working. A rollback in a single
> > record process doesn't cause the whole event to fail but a rollback
> invoked
> > for failing to update the database or prior to sending the event for
> > routing doesn't rollback either. Does anyone have any idea of how this
> > could be solved? Note that I am sure my transaction management is working
> > because it works for other routes.
> >
> > Cross opted to stack overflow:
> >
> >
> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
> >
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
>

Re: JTA Transactions Rollback of Routes using Directs?

Posted by Grzegorz Grzybek <gr...@gmail.com>.
Hello

Your onException clauses are not complete without "end()" and actually you
have no real exception handlers (tried this on Camel-2.13.0).

regards
Grzegorz Grzybek


2014-04-10 18:42 GMT+02:00 kraythe . <kr...@gmail.com>:

> Greetings, I have an interesting use case I was wondering if anyone had
> ideas on.
>
> Essentially there is a notifications table in a database that has
> essentially the following structure:
>
> CREATE TABLE "etl_case_notification" (
>   "process_id" int(11) NOT NULL DEFAULT '0',
>   "table_name" varchar(100) NOT NULL,
>   "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
>   "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
>   PRIMARY KEY ("process_id")
> );
>
> The idea is that we check the notifications table periodically (quartz) and
> then when there is a record that has a last updated date greater than last
> processed date, we read the data in table_name and process the records,
> routing them to the appropriate queue and when we have all the records
> routed, we update the last_processed date.
>
> The use case has the following goals:
> 1) If a single record in table_name fails to process, it gets sent to dead
> letter queue but no rollback.
> 2) If there is an error in the update of the last processed time, all
> processed records are removed from the queues they are routed to.
> 3) If any record in table_name fails to get to a queue to be routed, all
> records in that event should be aborted and last_processed not updated.
>
> To try to implement this I have the following route (note I have
> abbreviated items not relevant to the question.
>
> from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
>     .onException(Exception.class).handled(true).to("direct:dead") //
> failures to DLQ
>     .to("activemq:queue:event"); // send to event queue
>
> from("activemq:queue:event").routeId("processEvent") // from event queue
>     .onException(Exception.class).handled(false).to("direct:dead") //
> Failures to DLQ + rollback
>     .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
> transacted
>     .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
> sql to run.
>     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) //
> find the cases from the store
>     .split(body()).to("direct:routing").end() // split each case to routing
>     .setHeader(SqlConstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
> cases routed, update last processed.
>     .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); //
> execute update
>
> from("direct:routing").routeId("routeRecord") // routing of records
>     .onException(Exception.class).handled(true).to("direct:dead") //
> failures to DLQ, No rollback
>     .convertBodyTo(Record.class) // could potentially except
>     .to("activemq;queue:routing") // to routing tasks
>
>
> The problem is that the rollbacks are not working. A rollback in a single
> record process doesn't cause the whole event to fail but a rollback invoked
> for failing to update the database or prior to sending the event for
> routing doesn't rollback either. Does anyone have any idea of how this
> could be solved? Note that I am sure my transaction management is working
> because it works for other routes.
>
> Cross opted to stack overflow:
>
> http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>

RE: JTA Transactions Rollback of Routes using Directs?

Posted by "Siano, Stephan" <st...@sap.com>.
Hi,

What kind of transaction manager are you using? Rolling back over a JMS queue and a database would require XA resources that are participating in the same transaction. I am also not entirely sure how the transacted interacts with the onException handler.

You also don't write what the rollback does in your case, only that it doesn't work like you expect.

Best regards
Stephan

-----Original Message-----
From: kraythe . [mailto:kraythe@gmail.com] 
Sent: Donnerstag, 10. April 2014 18:43
To: Camel Users List
Subject: JTA Transactions Rollback of Routes using Directs?

Greetings, I have an interesting use case I was wondering if anyone had
ideas on.

Essentially there is a notifications table in a database that has
essentially the following structure:

CREATE TABLE "etl_case_notification" (
  "process_id" int(11) NOT NULL DEFAULT '0',
  "table_name" varchar(100) NOT NULL,
  "last_updated" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  "last_processed" timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  PRIMARY KEY ("process_id")
);

The idea is that we check the notifications table periodically (quartz) and
then when there is a record that has a last updated date greater than last
processed date, we read the data in table_name and process the records,
routing them to the appropriate queue and when we have all the records
routed, we update the last_processed date.

The use case has the following goals:
1) If a single record in table_name fails to process, it gets sent to dead
letter queue but no rollback.
2) If there is an error in the update of the last processed time, all
processed records are removed from the queues they are routed to.
3) If any record in table_name fails to get to a queue to be routed, all
records in that event should be aborted and last_processed not updated.

To try to implement this I have the following route (note I have
abbreviated items not relevant to the question.

from("quartz:" + quartzConfig).routeId("timedEvent") // timer trigger
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ
    .to("activemq:queue:event"); // send to event queue

from("activemq:queue:event").routeId("processEvent") // from event queue
    .onException(Exception.class).handled(false).to("direct:dead") //
Failures to DLQ + rollback
    .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) // make the route
transacted
    .setHeader(SqlConstants.SQL_QUERY, simple(sqlFetchCases)) // set the
sql to run.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()) //
find the cases from the store
    .split(body()).to("direct:routing").end() // split each case to routing
    .setHeader(SqlConvstants.SQL_QUERY, simple(sqlUpdateNotifications)) //
cases routed, update last processed.
    .to("sql:?dataSource=" + this.config.analyticsDatasourceJNDIName()); //
execute update

from("direct:routing").routeId("routeRecord") // routing of records
    .onException(Exception.class).handled(true).to("direct:dead") //
failures to DLQ, No rollback
    .convertBodyTo(Record.class) // could potentially except
    .to("activemq;queue:routing") // to routing tasks


The problem is that the rollbacks are not working. A rollback in a single
record process doesn't cause the whole event to fail but a rollback invoked
for failing to update the database or prior to sending the event for
routing doesn't rollback either. Does anyone have any idea of how this
could be solved? Note that I am sure my transaction management is working
because it works for other routes.

Cross opted to stack overflow:
http://stackoverflow.com/questions/22994012/jta-transactions-rollback-of-routes-using-directs

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*