You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by "kraythe ." <kr...@gmail.com> on 2014/04/14 20:10:22 UTC

Transactions: Rollback Destination but Not Dead Letter Queue or Source

So, in the ongoing perfect transaction configuration we have an interesting
use case: Consider the following route in a test:

  @Override
  protected RouteBuilder createRouteBuilder() {
    System.out.println("createRouteBuilder");
    return new RouteBuilder(this.context) {
      @Override
      public void configure() {
        getContext().setTracing(true);
        from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)

.split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
        from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .unmarshal(dfCaseRecord).to(MOCK_END);
        from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .unmarshal(dfCaseRecord).to(MOCK_DEAD);
        from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)

.onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .unmarshal(dfCaseRecord)
            .to(MOCK_BEFORE_TO_QUEUE)
            .marshal(dfCaseRecord)
            .to(endpointAMQ(QUEUE_OUTBOX))
            .unmarshal(dfCaseRecord)
            .to(MOCK_AFTER_TO_QUEUE);
      }
    };
  }

Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks up
the transaction policy by name as it is just a string key for our JNDI
registry. Our test case looks like the following.

  @Test
  public void testRollbackAfterEnqueue() throws Exception {
    adviceRoute();
    startCamelContext();
    initMocks();
    final RecordList cases = casesA();

    mockEnd.expectedMessageCount(2);
    mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
    mockDead.expectedMessageCount(1);
    mockDead.expectedBodiesReceived(cases.get(0));
    mockBeforeToQueue.expectedMessageCount(3);
    mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockOutbox.expectedMessageCount(3);
    mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
cases));
    mockAfterToQueue.expectedMessageCount(3);
    mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);

    template.sendBody(DIRECT_FEED_INBOX, cases);

    mockBeforeToQueue.assertIsSatisfied();
    mockAfterToQueue.assertIsSatisfied();
    mockEnd.assertIsSatisfied();
    mockDead.assertIsSatisfied();
    mockOutbox.assertIsSatisfied();
  }

In this route the goal is that if any exceptions are thrown even after the
message is enqueued, it should be rolled back, the message that excepted
should go to the DLQ and the messages in the outbox should be rolled back
but the message from the inbox should not be put back on the queue. This is
proving to be a bit of a juggling act.

I tried putting markRollBackOnly() in the exception handler after the
to(dead) but that rolled back the dead letter queue and outbox and then
redelivered the inbox message. Removing the markRollbackOnly() means that
the message arrives at dead and is off the inbox but it doesn't get removed
from the outbox.

So I am looking for the right combination of calls to accomplish what I
want to do. Any suggestions?

Thanks in advance.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by "kraythe ." <kr...@gmail.com>.
No the problem is I want it to rollback and it isnt. :( Clearly something
in my config or roure is borked. Wish I knew what. Thanks for the info
Claus. That will help at least contain what could be wrong.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Tue, Apr 29, 2014 at 10:44 AM, Claus Ibsen <cl...@gmail.com> wrote:

> On Tue, Apr 29, 2014 at 5:37 PM, kraythe . <kr...@gmail.com> wrote:
> > Thats what I thought. That makes my problem perplexing. Well I will
> update
> > the thread when I figure it out. If anyone has any ideas it would be
> > appreciated.
> >
>
> If you want the message to stay, then define a 2nd jms component and
> send the message using that.
>
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
> >
> > On Tue, Apr 29, 2014 at 4:11 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
> >
> >> On Tue, Apr 29, 2014 at 10:59 AM, kraythe . <kr...@gmail.com> wrote:
> >> > Yeah no problem. I was just hoping someone would have the answer. I
> keep
> >> > plugging away at it. Probably some transaction issue, I dont know. You
> >> can
> >> > answer this perhaps :) , when I send a message to an activemq endpoint
> >> with
> >> > InOnly exchange pattern, will that message be subject to the
> transation?
> >> > I.e. if the transaction fails will it get popped off AMQ?
> >> >
> >>
> >> Yes if you do that from a camel route using the same jms component /
> >> endpoint that started the transaction (eg same jms session).
> >>
> >> Then only when the TX commit, the message on the queue will be commit
> >> and "visible" for consumers.
> >> Its like figure 9.6 in Camel in Action book.
> >>
> >>
> >> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >
> >> >
> >> > On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <cl...@gmail.com>
> >> wrote:
> >> >
> >> >> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com>
> wrote:
> >> >> > No one has any idea on this? It would be really great if I could
> find
> >> the
> >> >> > formula to get this to work.
> >> >> >
> >> >>
> >> >> I dont think people always have the time to help, and especially when
> >> >> its more complicated with transactions and a lot of Camel route code.
> >> >>
> >> >> If you want to get priority help then there is some companies that
> offer
> >> >> that
> >> >> http://camel.apache.org/commercial-camel-offerings.html
> >> >>
> >> >> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> >> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> >> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> >> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >> >
> >> >> >
> >> >> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com>
> wrote:
> >> >> >
> >> >> >> So I think there is a problem with the way rollback is
> implemented in
> >> >> >> relation to Camel. As far as I can tell there is no way to get the
> >> >> >> following working.
> >> >> >>
> >> >> >> package com.ea.wwce.camel.test.utilities;
> >> >> >>
> >> >> >> import com.ea.wwce.camel.utilities.data.RecordList;
> >> >> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
> >> >> >> import org.apache.camel.ExchangePattern;
> >> >> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
> >> >> >> import org.apache.camel.builder.RouteBuilder;
> >> >> >> import org.apache.camel.component.mock.MockEndpoint;
> >> >> >> import org.testng.annotations.Test;
> >> >> >> import static
> >> com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
> >> >> >> import static
> >> >> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
> >> >> >> import static
> >> >> >>
> >> >>
> >>
> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
> >> >> >> import static org.apache.camel.ExchangePattern.InOnly;
> >> >> >>
> >> >> >> /** This test suite validates the transaction configuration in the
> >> test
> >> >> >> suite. */
> >> >> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
> >> >> >>   private static final String QUEUE_DEAD = "dead";
> >> >> >>   private static final String QUEUE_INBOX = "inbox";
> >> >> >>   private static final String QUEUE_OUTBOX = "outbox";
> >> >> >>   private static final String ROUTE_ID_FEED = "Feed";
> >> >> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
> >> >> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
> >> >> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
> >> >> >>   private static final String DIRECT_FEED_INBOX =
> >> "direct:feed_inbox";
> >> >> >>
> >> >> >>   private static final String MOCK_END = "mock:end";
> >> >> >>   private static final String MOCK_BEFORE_TO_QUEUE =
> >> >> >> "mock:before_to_queue";
> >> >> >>   private static final String MOCK_AFTER_TO_QUEUE =
> >> >> "mock:after_to_queue";
> >> >> >>
> >> >> >>   /** Mock endpoints. */
> >> >> >>   private MockEndpoint mockEnd, mockDead, mockOutbox,
> >> mockBeforeToQueue,
> >> >> >> mockAfterToQueue;
> >> >> >>
> >> >> >>   /** Helper to initialize mocks in the test. */
> >> >> >>   private void initMocks() {
> >> >> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
> >> >> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
> >> >> >>     mockBeforeToQueue =
> >> assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
> >> >> >>     mockAfterToQueue =
> assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
> >> >> >>     mockOutbox =
> >> >> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
> >> >> >>   }
> >> >> >>
> >> >> >>   @Override
> >> >> >>   protected RouteBuilder createRouteBuilder() {
> >> >> >>     System.out.println("createRouteBuilder");
> >> >> >>     return new RouteBuilder(this.context) {
> >> >> >>       @Override
> >> >> >>       public void configure() {
> >> >> >>         getContext().setTracing(true);
> >> >> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>
> >> >> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >> >> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >> >> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >> >> >>
> from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >> >> >>
> >> >> >>
> >> .onException(RuntimeException.class).handled(true).useOriginalMessage()
> >> >> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
> >> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
> >> >> >>             .unmarshal(dfCaseRecord)
> >> >> >>             .to(MOCK_BEFORE_TO_QUEUE)
> >> >> >>             .marshal(dfCaseRecord)
> >> >> >>             .to(endpointAMQ(QUEUE_OUTBOX))
> >> >> >>             .unmarshal(dfCaseRecord)
> >> >> >>             .to(MOCK_AFTER_TO_QUEUE);
> >> >> >>       }
> >> >> >>     };
> >> >> >>   }
> >> >> >>
> >> >> >>   /** Advice the route, mocking ActiveMQ endpoints. */
> >> >> >>   protected void adviceRoute() throws Exception {
> >> >> >>
> this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
> >> >> >>         this.context, new AdviceWithRouteBuilder() {
> >> >> >>           @Override
> >> >> >>           public void configure() throws Exception {
> >> >> >>             mockEndpoints("activemq:queue:*");
> >> >> >>           }
> >> >> >>         }
> >> >> >>     );
> >> >> >>   }
> >> >> >>
> >> >> >>   @Test
> >> >> >>   public void testNormalRouting() throws Exception {
> >> >> >>     adviceRoute();
> >> >> >>     startCamelContext();
> >> >> >>     initMocks();
> >> >> >>     final RecordList cases = casesA();
> >> >> >>
> >> >> >>     mockEnd.expectedBodiesReceived(cases);
> >> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>
> >> >> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> >> cases));
> >> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>
> >> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >> >>
> >> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >> >>     mockEnd.assertIsSatisfied();
> >> >> >>     mockDead.assertIsSatisfied();
> >> >> >>     mockOutbox.assertIsSatisfied();
> >> >> >>   }
> >> >> >>
> >> >> >>   @Test
> >> >> >>   public void testRollbackBeforeEnqueue() throws Exception {
> >> >> >>     adviceRoute();
> >> >> >>     startCamelContext();
> >> >> >>     initMocks();
> >> >> >>     final RecordList cases = casesA();
> >> >> >>
> >> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> >> cases.get(2));
> >> >> >>     mockDead.expectedBodiesReceived(cases.get(0));
> >> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>     mockBeforeToQueue.whenExchangeReceived(1,
> EXCEPTION_PROCESSOR);
> >> >> >>
> >> >> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> >> cases.get(1), cases.get(2)));
> >> >> >>
> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> >> >> cases.get(2));
> >> >> >>
> >> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >> >>
> >> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >> >>     mockEnd.assertIsSatisfied();
> >> >> >>     mockDead.assertIsSatisfied();
> >> >> >>     mockOutbox.assertIsSatisfied();
> >> >> >>   }
> >> >> >>
> >> >> >>   @Test
> >> >> >>   public void testRollbackAfterEnqueue() throws Exception {
> >> >> >>     adviceRoute();
> >> >> >>     startCamelContext();
> >> >> >>     initMocks();
> >> >> >>     final RecordList cases = casesA();
> >> >> >>
> >> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> >> cases.get(2));
> >> >> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
> >> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>
> >> >> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> >> cases));
> >> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >> >> >>
> >> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >> >>
> >> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >> >>     mockDead.assertIsSatisfied();
> >> >> >>     mockOutbox.assertIsSatisfied();
> >> >> >>     mockEnd.assertIsSatisfied();
> >> >> >>   }
> >> >> >> }
> >> >> >>
> >> >> >> I have tried dozens of combinations in the onException clause and
> >> >> nothing
> >> >> >> works. Adding markRollbackOnly(), or rollback() only succeeds in
> >> rolling
> >> >> >> back the dead letter channel as well. Making the handled(false)
> cause
> >> >> AMQ
> >> >> >> to resubmit the transaction and rolls back the dead letter
> channel. I
> >> >> have
> >> >> >> tried a dozen combinations so if anyone has one that works I
> would be
> >> >> >> grateful.
> >> >> >>
> >> >> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> >> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> >> >> *LinkedIn: **
> http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> >> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >> >>
> >> >> >>
> >> >> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com>
> >> wrote:
> >> >> >>
> >> >> >>> So, in the ongoing perfect transaction configuration we have an
> >> >> >>> interesting use case: Consider the following route in a test:
> >> >> >>>
> >> >> >>>   @Override
> >> >> >>>   protected RouteBuilder createRouteBuilder() {
> >> >> >>>     System.out.println("createRouteBuilder");
> >> >> >>>     return new RouteBuilder(this.context) {
> >> >> >>>       @Override
> >> >> >>>       public void configure() {
> >> >> >>>         getContext().setTracing(true);
> >> >> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>>
> >> >> >>>
> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >> >> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >> >> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >> >> >>>
> from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >> >> >>>
> >> >> >>>
> >> >>
> >>
> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
> >> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >> >>>             .unmarshal(dfCaseRecord)
> >> >> >>>             .to(MOCK_BEFORE_TO_QUEUE)
> >> >> >>>             .marshal(dfCaseRecord)
> >> >> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
> >> >> >>>             .unmarshal(dfCaseRecord)
> >> >> >>>             .to(MOCK_AFTER_TO_QUEUE);
> >> >> >>>       }
> >> >> >>>     };
> >> >> >>>   }
> >> >> >>>
> >> >> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply
> >> looks
> >> >> >>> up the transaction policy by name as it is just a string key for
> our
> >> >> JNDI
> >> >> >>> registry. Our test case looks like the following.
> >> >> >>>
> >> >> >>>   @Test
> >> >> >>>   public void testRollbackAfterEnqueue() throws Exception {
> >> >> >>>     adviceRoute();
> >> >> >>>     startCamelContext();
> >> >> >>>     initMocks();
> >> >> >>>     final RecordList cases = casesA();
> >> >> >>>
> >> >> >>>     mockEnd.expectedMessageCount(2);
> >> >> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
> >> >> >>>     mockDead.expectedMessageCount(1);
> >> >> >>>     mockDead.expectedBodiesReceived(cases.get(0));
> >> >> >>>     mockBeforeToQueue.expectedMessageCount(3);
> >> >> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>>     mockOutbox.expectedMessageCount(3);
> >> >> >>>
> >> >> >>>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> >>> cases));
> >> >> >>>     mockAfterToQueue.expectedMessageCount(3);
> >> >> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >> >>>     mockAfterToQueue.whenExchangeReceived(1,
> EXCEPTION_PROCESSOR);
> >> >> >>>
> >> >> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >> >>>
> >> >> >>>     mockBeforeToQueue.assertIsSatisfied();
> >> >> >>>     mockAfterToQueue.assertIsSatisfied();
> >> >> >>>     mockEnd.assertIsSatisfied();
> >> >> >>>     mockDead.assertIsSatisfied();
> >> >> >>>     mockOutbox.assertIsSatisfied();
> >> >> >>>   }
> >> >> >>>
> >> >> >>> In this route the goal is that if any exceptions are thrown even
> >> after
> >> >> >>> the message is enqueued, it should be rolled back, the message
> that
> >> >> >>> excepted should go to the DLQ and the messages in the outbox
> should
> >> be
> >> >> >>> rolled back but the message from the inbox should not be put
> back on
> >> >> the
> >> >> >>> queue. This is proving to be a bit of a juggling act.
> >> >> >>>
> >> >> >>> I tried putting markRollBackOnly() in the exception handler after
> >> the
> >> >> >>> to(dead) but that rolled back the dead letter queue and outbox
> and
> >> then
> >> >> >>> redelivered the inbox message. Removing the markRollbackOnly()
> means
> >> >> >>> that the message arrives at dead and is off the inbox but it
> doesn't
> >> >> get
> >> >> >>> removed from the outbox.
> >> >> >>>
> >> >> >>> So I am looking for the right combination of calls to accomplish
> >> what I
> >> >> >>> want to do. Any suggestions?
> >> >> >>>
> >> >> >>> Thanks in advance.
> >> >> >>>
> >> >> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> >> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> >> >>> *LinkedIn: **
> http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> >> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Claus Ibsen
> >> >> -----------------
> >> >> Red Hat, Inc.
> >> >> Email: cibsen@redhat.com
> >> >> Twitter: davsclaus
> >> >> Blog: http://davsclaus.com
> >> >> Author of Camel in Action: http://www.manning.com/ibsen
> >> >> hawtio: http://hawt.io/
> >> >> fabric8: http://fabric8.io/
> >> >>
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> Red Hat, Inc.
> >> Email: cibsen@redhat.com
> >> Twitter: davsclaus
> >> Blog: http://davsclaus.com
> >> Author of Camel in Action: http://www.manning.com/ibsen
> >> hawtio: http://hawt.io/
> >> fabric8: http://fabric8.io/
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Apr 29, 2014 at 5:37 PM, kraythe . <kr...@gmail.com> wrote:
> Thats what I thought. That makes my problem perplexing. Well I will update
> the thread when I figure it out. If anyone has any ideas it would be
> appreciated.
>

If you want the message to stay, then define a 2nd jms component and
send the message using that.

> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Tue, Apr 29, 2014 at 4:11 AM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> On Tue, Apr 29, 2014 at 10:59 AM, kraythe . <kr...@gmail.com> wrote:
>> > Yeah no problem. I was just hoping someone would have the answer. I keep
>> > plugging away at it. Probably some transaction issue, I dont know. You
>> can
>> > answer this perhaps :) , when I send a message to an activemq endpoint
>> with
>> > InOnly exchange pattern, will that message be subject to the transation?
>> > I.e. if the transaction fails will it get popped off AMQ?
>> >
>>
>> Yes if you do that from a camel route using the same jms component /
>> endpoint that started the transaction (eg same jms session).
>>
>> Then only when the TX commit, the message on the queue will be commit
>> and "visible" for consumers.
>> Its like figure 9.6 in Camel in Action book.
>>
>>
>> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >
>> >
>> > On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <cl...@gmail.com>
>> wrote:
>> >
>> >> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com> wrote:
>> >> > No one has any idea on this? It would be really great if I could find
>> the
>> >> > formula to get this to work.
>> >> >
>> >>
>> >> I dont think people always have the time to help, and especially when
>> >> its more complicated with transactions and a lot of Camel route code.
>> >>
>> >> If you want to get priority help then there is some companies that offer
>> >> that
>> >> http://camel.apache.org/commercial-camel-offerings.html
>> >>
>> >> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >> >
>> >> >
>> >> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:
>> >> >
>> >> >> So I think there is a problem with the way rollback is implemented in
>> >> >> relation to Camel. As far as I can tell there is no way to get the
>> >> >> following working.
>> >> >>
>> >> >> package com.ea.wwce.camel.test.utilities;
>> >> >>
>> >> >> import com.ea.wwce.camel.utilities.data.RecordList;
>> >> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
>> >> >> import org.apache.camel.ExchangePattern;
>> >> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
>> >> >> import org.apache.camel.builder.RouteBuilder;
>> >> >> import org.apache.camel.component.mock.MockEndpoint;
>> >> >> import org.testng.annotations.Test;
>> >> >> import static
>> com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
>> >> >> import static
>> >> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
>> >> >> import static
>> >> >>
>> >>
>> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
>> >> >> import static org.apache.camel.ExchangePattern.InOnly;
>> >> >>
>> >> >> /** This test suite validates the transaction configuration in the
>> test
>> >> >> suite. */
>> >> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
>> >> >>   private static final String QUEUE_DEAD = "dead";
>> >> >>   private static final String QUEUE_INBOX = "inbox";
>> >> >>   private static final String QUEUE_OUTBOX = "outbox";
>> >> >>   private static final String ROUTE_ID_FEED = "Feed";
>> >> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
>> >> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
>> >> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
>> >> >>   private static final String DIRECT_FEED_INBOX =
>> "direct:feed_inbox";
>> >> >>
>> >> >>   private static final String MOCK_END = "mock:end";
>> >> >>   private static final String MOCK_BEFORE_TO_QUEUE =
>> >> >> "mock:before_to_queue";
>> >> >>   private static final String MOCK_AFTER_TO_QUEUE =
>> >> "mock:after_to_queue";
>> >> >>
>> >> >>   /** Mock endpoints. */
>> >> >>   private MockEndpoint mockEnd, mockDead, mockOutbox,
>> mockBeforeToQueue,
>> >> >> mockAfterToQueue;
>> >> >>
>> >> >>   /** Helper to initialize mocks in the test. */
>> >> >>   private void initMocks() {
>> >> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
>> >> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
>> >> >>     mockBeforeToQueue =
>> assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
>> >> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
>> >> >>     mockOutbox =
>> >> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
>> >> >>   }
>> >> >>
>> >> >>   @Override
>> >> >>   protected RouteBuilder createRouteBuilder() {
>> >> >>     System.out.println("createRouteBuilder");
>> >> >>     return new RouteBuilder(this.context) {
>> >> >>       @Override
>> >> >>       public void configure() {
>> >> >>         getContext().setTracing(true);
>> >> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>
>> >> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >> >>
>> >> >>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage()
>> >> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
>> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
>> >> >>             .unmarshal(dfCaseRecord)
>> >> >>             .to(MOCK_BEFORE_TO_QUEUE)
>> >> >>             .marshal(dfCaseRecord)
>> >> >>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >> >>             .unmarshal(dfCaseRecord)
>> >> >>             .to(MOCK_AFTER_TO_QUEUE);
>> >> >>       }
>> >> >>     };
>> >> >>   }
>> >> >>
>> >> >>   /** Advice the route, mocking ActiveMQ endpoints. */
>> >> >>   protected void adviceRoute() throws Exception {
>> >> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
>> >> >>         this.context, new AdviceWithRouteBuilder() {
>> >> >>           @Override
>> >> >>           public void configure() throws Exception {
>> >> >>             mockEndpoints("activemq:queue:*");
>> >> >>           }
>> >> >>         }
>> >> >>     );
>> >> >>   }
>> >> >>
>> >> >>   @Test
>> >> >>   public void testNormalRouting() throws Exception {
>> >> >>     adviceRoute();
>> >> >>     startCamelContext();
>> >> >>     initMocks();
>> >> >>     final RecordList cases = casesA();
>> >> >>
>> >> >>     mockEnd.expectedBodiesReceived(cases);
>> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>
>> >> >>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> >> cases));
>> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>
>> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >> >>
>> >> >>     mockBeforeToQueue.assertIsSatisfied();
>> >> >>     mockAfterToQueue.assertIsSatisfied();
>> >> >>     mockEnd.assertIsSatisfied();
>> >> >>     mockDead.assertIsSatisfied();
>> >> >>     mockOutbox.assertIsSatisfied();
>> >> >>   }
>> >> >>
>> >> >>   @Test
>> >> >>   public void testRollbackBeforeEnqueue() throws Exception {
>> >> >>     adviceRoute();
>> >> >>     startCamelContext();
>> >> >>     initMocks();
>> >> >>     final RecordList cases = casesA();
>> >> >>
>> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> >> cases.get(2));
>> >> >>     mockDead.expectedBodiesReceived(cases.get(0));
>> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >> >>
>> >> >>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> >> cases.get(1), cases.get(2)));
>> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> >> >> cases.get(2));
>> >> >>
>> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >> >>
>> >> >>     mockBeforeToQueue.assertIsSatisfied();
>> >> >>     mockAfterToQueue.assertIsSatisfied();
>> >> >>     mockEnd.assertIsSatisfied();
>> >> >>     mockDead.assertIsSatisfied();
>> >> >>     mockOutbox.assertIsSatisfied();
>> >> >>   }
>> >> >>
>> >> >>   @Test
>> >> >>   public void testRollbackAfterEnqueue() throws Exception {
>> >> >>     adviceRoute();
>> >> >>     startCamelContext();
>> >> >>     initMocks();
>> >> >>     final RecordList cases = casesA();
>> >> >>
>> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> >> cases.get(2));
>> >> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
>> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>
>> >> >>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> >> cases));
>> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >> >>
>> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >> >>
>> >> >>     mockBeforeToQueue.assertIsSatisfied();
>> >> >>     mockAfterToQueue.assertIsSatisfied();
>> >> >>     mockDead.assertIsSatisfied();
>> >> >>     mockOutbox.assertIsSatisfied();
>> >> >>     mockEnd.assertIsSatisfied();
>> >> >>   }
>> >> >> }
>> >> >>
>> >> >> I have tried dozens of combinations in the onException clause and
>> >> nothing
>> >> >> works. Adding markRollbackOnly(), or rollback() only succeeds in
>> rolling
>> >> >> back the dead letter channel as well. Making the handled(false) cause
>> >> AMQ
>> >> >> to resubmit the transaction and rolls back the dead letter channel. I
>> >> have
>> >> >> tried a dozen combinations so if anyone has one that works I would be
>> >> >> grateful.
>> >> >>
>> >> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >> >>
>> >> >>
>> >> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com>
>> wrote:
>> >> >>
>> >> >>> So, in the ongoing perfect transaction configuration we have an
>> >> >>> interesting use case: Consider the following route in a test:
>> >> >>>
>> >> >>>   @Override
>> >> >>>   protected RouteBuilder createRouteBuilder() {
>> >> >>>     System.out.println("createRouteBuilder");
>> >> >>>     return new RouteBuilder(this.context) {
>> >> >>>       @Override
>> >> >>>       public void configure() {
>> >> >>>         getContext().setTracing(true);
>> >> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>>
>> >> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >> >>>
>> >> >>>
>> >>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >> >>>             .unmarshal(dfCaseRecord)
>> >> >>>             .to(MOCK_BEFORE_TO_QUEUE)
>> >> >>>             .marshal(dfCaseRecord)
>> >> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >> >>>             .unmarshal(dfCaseRecord)
>> >> >>>             .to(MOCK_AFTER_TO_QUEUE);
>> >> >>>       }
>> >> >>>     };
>> >> >>>   }
>> >> >>>
>> >> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply
>> looks
>> >> >>> up the transaction policy by name as it is just a string key for our
>> >> JNDI
>> >> >>> registry. Our test case looks like the following.
>> >> >>>
>> >> >>>   @Test
>> >> >>>   public void testRollbackAfterEnqueue() throws Exception {
>> >> >>>     adviceRoute();
>> >> >>>     startCamelContext();
>> >> >>>     initMocks();
>> >> >>>     final RecordList cases = casesA();
>> >> >>>
>> >> >>>     mockEnd.expectedMessageCount(2);
>> >> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>> >> >>>     mockDead.expectedMessageCount(1);
>> >> >>>     mockDead.expectedBodiesReceived(cases.get(0));
>> >> >>>     mockBeforeToQueue.expectedMessageCount(3);
>> >> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>>     mockOutbox.expectedMessageCount(3);
>> >> >>>
>> >> >>>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> >>> cases));
>> >> >>>     mockAfterToQueue.expectedMessageCount(3);
>> >> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >> >>>
>> >> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >> >>>
>> >> >>>     mockBeforeToQueue.assertIsSatisfied();
>> >> >>>     mockAfterToQueue.assertIsSatisfied();
>> >> >>>     mockEnd.assertIsSatisfied();
>> >> >>>     mockDead.assertIsSatisfied();
>> >> >>>     mockOutbox.assertIsSatisfied();
>> >> >>>   }
>> >> >>>
>> >> >>> In this route the goal is that if any exceptions are thrown even
>> after
>> >> >>> the message is enqueued, it should be rolled back, the message that
>> >> >>> excepted should go to the DLQ and the messages in the outbox should
>> be
>> >> >>> rolled back but the message from the inbox should not be put back on
>> >> the
>> >> >>> queue. This is proving to be a bit of a juggling act.
>> >> >>>
>> >> >>> I tried putting markRollBackOnly() in the exception handler after
>> the
>> >> >>> to(dead) but that rolled back the dead letter queue and outbox and
>> then
>> >> >>> redelivered the inbox message. Removing the markRollbackOnly() means
>> >> >>> that the message arrives at dead and is off the inbox but it doesn't
>> >> get
>> >> >>> removed from the outbox.
>> >> >>>
>> >> >>> So I am looking for the right combination of calls to accomplish
>> what I
>> >> >>> want to do. Any suggestions?
>> >> >>>
>> >> >>> Thanks in advance.
>> >> >>>
>> >> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >> >>>
>> >> >>
>> >> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Claus Ibsen
>> >> -----------------
>> >> Red Hat, Inc.
>> >> Email: cibsen@redhat.com
>> >> Twitter: davsclaus
>> >> Blog: http://davsclaus.com
>> >> Author of Camel in Action: http://www.manning.com/ibsen
>> >> hawtio: http://hawt.io/
>> >> fabric8: http://fabric8.io/
>> >>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by "kraythe ." <kr...@gmail.com>.
Thats what I thought. That makes my problem perplexing. Well I will update
the thread when I figure it out. If anyone has any ideas it would be
appreciated.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Tue, Apr 29, 2014 at 4:11 AM, Claus Ibsen <cl...@gmail.com> wrote:

> On Tue, Apr 29, 2014 at 10:59 AM, kraythe . <kr...@gmail.com> wrote:
> > Yeah no problem. I was just hoping someone would have the answer. I keep
> > plugging away at it. Probably some transaction issue, I dont know. You
> can
> > answer this perhaps :) , when I send a message to an activemq endpoint
> with
> > InOnly exchange pattern, will that message be subject to the transation?
> > I.e. if the transaction fails will it get popped off AMQ?
> >
>
> Yes if you do that from a camel route using the same jms component /
> endpoint that started the transaction (eg same jms session).
>
> Then only when the TX commit, the message on the queue will be commit
> and "visible" for consumers.
> Its like figure 9.6 in Camel in Action book.
>
>
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
> >
> > On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
> >
> >> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com> wrote:
> >> > No one has any idea on this? It would be really great if I could find
> the
> >> > formula to get this to work.
> >> >
> >>
> >> I dont think people always have the time to help, and especially when
> >> its more complicated with transactions and a lot of Camel route code.
> >>
> >> If you want to get priority help then there is some companies that offer
> >> that
> >> http://camel.apache.org/commercial-camel-offerings.html
> >>
> >> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >
> >> >
> >> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:
> >> >
> >> >> So I think there is a problem with the way rollback is implemented in
> >> >> relation to Camel. As far as I can tell there is no way to get the
> >> >> following working.
> >> >>
> >> >> package com.ea.wwce.camel.test.utilities;
> >> >>
> >> >> import com.ea.wwce.camel.utilities.data.RecordList;
> >> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
> >> >> import org.apache.camel.ExchangePattern;
> >> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
> >> >> import org.apache.camel.builder.RouteBuilder;
> >> >> import org.apache.camel.component.mock.MockEndpoint;
> >> >> import org.testng.annotations.Test;
> >> >> import static
> com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
> >> >> import static
> >> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
> >> >> import static
> >> >>
> >>
> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
> >> >> import static org.apache.camel.ExchangePattern.InOnly;
> >> >>
> >> >> /** This test suite validates the transaction configuration in the
> test
> >> >> suite. */
> >> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
> >> >>   private static final String QUEUE_DEAD = "dead";
> >> >>   private static final String QUEUE_INBOX = "inbox";
> >> >>   private static final String QUEUE_OUTBOX = "outbox";
> >> >>   private static final String ROUTE_ID_FEED = "Feed";
> >> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
> >> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
> >> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
> >> >>   private static final String DIRECT_FEED_INBOX =
> "direct:feed_inbox";
> >> >>
> >> >>   private static final String MOCK_END = "mock:end";
> >> >>   private static final String MOCK_BEFORE_TO_QUEUE =
> >> >> "mock:before_to_queue";
> >> >>   private static final String MOCK_AFTER_TO_QUEUE =
> >> "mock:after_to_queue";
> >> >>
> >> >>   /** Mock endpoints. */
> >> >>   private MockEndpoint mockEnd, mockDead, mockOutbox,
> mockBeforeToQueue,
> >> >> mockAfterToQueue;
> >> >>
> >> >>   /** Helper to initialize mocks in the test. */
> >> >>   private void initMocks() {
> >> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
> >> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
> >> >>     mockBeforeToQueue =
> assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
> >> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
> >> >>     mockOutbox =
> >> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
> >> >>   }
> >> >>
> >> >>   @Override
> >> >>   protected RouteBuilder createRouteBuilder() {
> >> >>     System.out.println("createRouteBuilder");
> >> >>     return new RouteBuilder(this.context) {
> >> >>       @Override
> >> >>       public void configure() {
> >> >>         getContext().setTracing(true);
> >> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>
> >> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >> >>
> >> >>
> .onException(RuntimeException.class).handled(true).useOriginalMessage()
> >> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
> >> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
> >> >>             .unmarshal(dfCaseRecord)
> >> >>             .to(MOCK_BEFORE_TO_QUEUE)
> >> >>             .marshal(dfCaseRecord)
> >> >>             .to(endpointAMQ(QUEUE_OUTBOX))
> >> >>             .unmarshal(dfCaseRecord)
> >> >>             .to(MOCK_AFTER_TO_QUEUE);
> >> >>       }
> >> >>     };
> >> >>   }
> >> >>
> >> >>   /** Advice the route, mocking ActiveMQ endpoints. */
> >> >>   protected void adviceRoute() throws Exception {
> >> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
> >> >>         this.context, new AdviceWithRouteBuilder() {
> >> >>           @Override
> >> >>           public void configure() throws Exception {
> >> >>             mockEndpoints("activemq:queue:*");
> >> >>           }
> >> >>         }
> >> >>     );
> >> >>   }
> >> >>
> >> >>   @Test
> >> >>   public void testNormalRouting() throws Exception {
> >> >>     adviceRoute();
> >> >>     startCamelContext();
> >> >>     initMocks();
> >> >>     final RecordList cases = casesA();
> >> >>
> >> >>     mockEnd.expectedBodiesReceived(cases);
> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>
> >> >>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> cases));
> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>
> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >>
> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >>     mockEnd.assertIsSatisfied();
> >> >>     mockDead.assertIsSatisfied();
> >> >>     mockOutbox.assertIsSatisfied();
> >> >>   }
> >> >>
> >> >>   @Test
> >> >>   public void testRollbackBeforeEnqueue() throws Exception {
> >> >>     adviceRoute();
> >> >>     startCamelContext();
> >> >>     initMocks();
> >> >>     final RecordList cases = casesA();
> >> >>
> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> cases.get(2));
> >> >>     mockDead.expectedBodiesReceived(cases.get(0));
> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >> >>
> >> >>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> cases.get(1), cases.get(2)));
> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> >> cases.get(2));
> >> >>
> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >>
> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >>     mockEnd.assertIsSatisfied();
> >> >>     mockDead.assertIsSatisfied();
> >> >>     mockOutbox.assertIsSatisfied();
> >> >>   }
> >> >>
> >> >>   @Test
> >> >>   public void testRollbackAfterEnqueue() throws Exception {
> >> >>     adviceRoute();
> >> >>     startCamelContext();
> >> >>     initMocks();
> >> >>     final RecordList cases = casesA();
> >> >>
> >> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> cases.get(2));
> >> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
> >> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>
> >> >>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >> cases));
> >> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >> >>
> >> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >>
> >> >>     mockBeforeToQueue.assertIsSatisfied();
> >> >>     mockAfterToQueue.assertIsSatisfied();
> >> >>     mockDead.assertIsSatisfied();
> >> >>     mockOutbox.assertIsSatisfied();
> >> >>     mockEnd.assertIsSatisfied();
> >> >>   }
> >> >> }
> >> >>
> >> >> I have tried dozens of combinations in the onException clause and
> >> nothing
> >> >> works. Adding markRollbackOnly(), or rollback() only succeeds in
> rolling
> >> >> back the dead letter channel as well. Making the handled(false) cause
> >> AMQ
> >> >> to resubmit the transaction and rolls back the dead letter channel. I
> >> have
> >> >> tried a dozen combinations so if anyone has one that works I would be
> >> >> grateful.
> >> >>
> >> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >>
> >> >>
> >> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com>
> wrote:
> >> >>
> >> >>> So, in the ongoing perfect transaction configuration we have an
> >> >>> interesting use case: Consider the following route in a test:
> >> >>>
> >> >>>   @Override
> >> >>>   protected RouteBuilder createRouteBuilder() {
> >> >>>     System.out.println("createRouteBuilder");
> >> >>>     return new RouteBuilder(this.context) {
> >> >>>       @Override
> >> >>>       public void configure() {
> >> >>>         getContext().setTracing(true);
> >> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>>
> >> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >> >>>
> >> >>>
> >>
> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
> >> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >> >>>             .unmarshal(dfCaseRecord)
> >> >>>             .to(MOCK_BEFORE_TO_QUEUE)
> >> >>>             .marshal(dfCaseRecord)
> >> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
> >> >>>             .unmarshal(dfCaseRecord)
> >> >>>             .to(MOCK_AFTER_TO_QUEUE);
> >> >>>       }
> >> >>>     };
> >> >>>   }
> >> >>>
> >> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply
> looks
> >> >>> up the transaction policy by name as it is just a string key for our
> >> JNDI
> >> >>> registry. Our test case looks like the following.
> >> >>>
> >> >>>   @Test
> >> >>>   public void testRollbackAfterEnqueue() throws Exception {
> >> >>>     adviceRoute();
> >> >>>     startCamelContext();
> >> >>>     initMocks();
> >> >>>     final RecordList cases = casesA();
> >> >>>
> >> >>>     mockEnd.expectedMessageCount(2);
> >> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
> >> >>>     mockDead.expectedMessageCount(1);
> >> >>>     mockDead.expectedBodiesReceived(cases.get(0));
> >> >>>     mockBeforeToQueue.expectedMessageCount(3);
> >> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>>     mockOutbox.expectedMessageCount(3);
> >> >>>
> >> >>>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> >>> cases));
> >> >>>     mockAfterToQueue.expectedMessageCount(3);
> >> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >> >>>
> >> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >> >>>
> >> >>>     mockBeforeToQueue.assertIsSatisfied();
> >> >>>     mockAfterToQueue.assertIsSatisfied();
> >> >>>     mockEnd.assertIsSatisfied();
> >> >>>     mockDead.assertIsSatisfied();
> >> >>>     mockOutbox.assertIsSatisfied();
> >> >>>   }
> >> >>>
> >> >>> In this route the goal is that if any exceptions are thrown even
> after
> >> >>> the message is enqueued, it should be rolled back, the message that
> >> >>> excepted should go to the DLQ and the messages in the outbox should
> be
> >> >>> rolled back but the message from the inbox should not be put back on
> >> the
> >> >>> queue. This is proving to be a bit of a juggling act.
> >> >>>
> >> >>> I tried putting markRollBackOnly() in the exception handler after
> the
> >> >>> to(dead) but that rolled back the dead letter queue and outbox and
> then
> >> >>> redelivered the inbox message. Removing the markRollbackOnly() means
> >> >>> that the message arrives at dead and is off the inbox but it doesn't
> >> get
> >> >>> removed from the outbox.
> >> >>>
> >> >>> So I am looking for the right combination of calls to accomplish
> what I
> >> >>> want to do. Any suggestions?
> >> >>>
> >> >>> Thanks in advance.
> >> >>>
> >> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >> >>>
> >> >>
> >> >>
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> Red Hat, Inc.
> >> Email: cibsen@redhat.com
> >> Twitter: davsclaus
> >> Blog: http://davsclaus.com
> >> Author of Camel in Action: http://www.manning.com/ibsen
> >> hawtio: http://hawt.io/
> >> fabric8: http://fabric8.io/
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by Claus Ibsen <cl...@gmail.com>.
On Tue, Apr 29, 2014 at 10:59 AM, kraythe . <kr...@gmail.com> wrote:
> Yeah no problem. I was just hoping someone would have the answer. I keep
> plugging away at it. Probably some transaction issue, I dont know. You can
> answer this perhaps :) , when I send a message to an activemq endpoint with
> InOnly exchange pattern, will that message be subject to the transation?
> I.e. if the transaction fails will it get popped off AMQ?
>

Yes if you do that from a camel route using the same jms component /
endpoint that started the transaction (eg same jms session).

Then only when the TX commit, the message on the queue will be commit
and "visible" for consumers.
Its like figure 9.6 in Camel in Action book.


> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <cl...@gmail.com> wrote:
>
>> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com> wrote:
>> > No one has any idea on this? It would be really great if I could find the
>> > formula to get this to work.
>> >
>>
>> I dont think people always have the time to help, and especially when
>> its more complicated with transactions and a lot of Camel route code.
>>
>> If you want to get priority help then there is some companies that offer
>> that
>> http://camel.apache.org/commercial-camel-offerings.html
>>
>> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >
>> >
>> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:
>> >
>> >> So I think there is a problem with the way rollback is implemented in
>> >> relation to Camel. As far as I can tell there is no way to get the
>> >> following working.
>> >>
>> >> package com.ea.wwce.camel.test.utilities;
>> >>
>> >> import com.ea.wwce.camel.utilities.data.RecordList;
>> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
>> >> import org.apache.camel.ExchangePattern;
>> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
>> >> import org.apache.camel.builder.RouteBuilder;
>> >> import org.apache.camel.component.mock.MockEndpoint;
>> >> import org.testng.annotations.Test;
>> >> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
>> >> import static
>> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
>> >> import static
>> >>
>> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
>> >> import static org.apache.camel.ExchangePattern.InOnly;
>> >>
>> >> /** This test suite validates the transaction configuration in the test
>> >> suite. */
>> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
>> >>   private static final String QUEUE_DEAD = "dead";
>> >>   private static final String QUEUE_INBOX = "inbox";
>> >>   private static final String QUEUE_OUTBOX = "outbox";
>> >>   private static final String ROUTE_ID_FEED = "Feed";
>> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
>> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
>> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
>> >>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
>> >>
>> >>   private static final String MOCK_END = "mock:end";
>> >>   private static final String MOCK_BEFORE_TO_QUEUE =
>> >> "mock:before_to_queue";
>> >>   private static final String MOCK_AFTER_TO_QUEUE =
>> "mock:after_to_queue";
>> >>
>> >>   /** Mock endpoints. */
>> >>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
>> >> mockAfterToQueue;
>> >>
>> >>   /** Helper to initialize mocks in the test. */
>> >>   private void initMocks() {
>> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
>> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
>> >>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
>> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
>> >>     mockOutbox =
>> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
>> >>   }
>> >>
>> >>   @Override
>> >>   protected RouteBuilder createRouteBuilder() {
>> >>     System.out.println("createRouteBuilder");
>> >>     return new RouteBuilder(this.context) {
>> >>       @Override
>> >>       public void configure() {
>> >>         getContext().setTracing(true);
>> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>
>> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >>
>> >> .onException(RuntimeException.class).handled(true).useOriginalMessage()
>> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
>> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
>> >>             .unmarshal(dfCaseRecord)
>> >>             .to(MOCK_BEFORE_TO_QUEUE)
>> >>             .marshal(dfCaseRecord)
>> >>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >>             .unmarshal(dfCaseRecord)
>> >>             .to(MOCK_AFTER_TO_QUEUE);
>> >>       }
>> >>     };
>> >>   }
>> >>
>> >>   /** Advice the route, mocking ActiveMQ endpoints. */
>> >>   protected void adviceRoute() throws Exception {
>> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
>> >>         this.context, new AdviceWithRouteBuilder() {
>> >>           @Override
>> >>           public void configure() throws Exception {
>> >>             mockEndpoints("activemq:queue:*");
>> >>           }
>> >>         }
>> >>     );
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testNormalRouting() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceived(cases);
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testRollbackBeforeEnqueue() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> cases.get(2));
>> >>     mockDead.expectedBodiesReceived(cases.get(0));
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases.get(1), cases.get(2)));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> >> cases.get(2));
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>   }
>> >>
>> >>   @Test
>> >>   public void testRollbackAfterEnqueue() throws Exception {
>> >>     adviceRoute();
>> >>     startCamelContext();
>> >>     initMocks();
>> >>     final RecordList cases = casesA();
>> >>
>> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> cases.get(2));
>> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
>> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>
>> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >> cases));
>> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>
>> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>
>> >>     mockBeforeToQueue.assertIsSatisfied();
>> >>     mockAfterToQueue.assertIsSatisfied();
>> >>     mockDead.assertIsSatisfied();
>> >>     mockOutbox.assertIsSatisfied();
>> >>     mockEnd.assertIsSatisfied();
>> >>   }
>> >> }
>> >>
>> >> I have tried dozens of combinations in the onException clause and
>> nothing
>> >> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
>> >> back the dead letter channel as well. Making the handled(false) cause
>> AMQ
>> >> to resubmit the transaction and rolls back the dead letter channel. I
>> have
>> >> tried a dozen combinations so if anyone has one that works I would be
>> >> grateful.
>> >>
>> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >>
>> >>
>> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com> wrote:
>> >>
>> >>> So, in the ongoing perfect transaction configuration we have an
>> >>> interesting use case: Consider the following route in a test:
>> >>>
>> >>>   @Override
>> >>>   protected RouteBuilder createRouteBuilder() {
>> >>>     System.out.println("createRouteBuilder");
>> >>>     return new RouteBuilder(this.context) {
>> >>>       @Override
>> >>>       public void configure() {
>> >>>         getContext().setTracing(true);
>> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>
>> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>> >>>
>> >>>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>> >>>             .unmarshal(dfCaseRecord)
>> >>>             .to(MOCK_BEFORE_TO_QUEUE)
>> >>>             .marshal(dfCaseRecord)
>> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
>> >>>             .unmarshal(dfCaseRecord)
>> >>>             .to(MOCK_AFTER_TO_QUEUE);
>> >>>       }
>> >>>     };
>> >>>   }
>> >>>
>> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
>> >>> up the transaction policy by name as it is just a string key for our
>> JNDI
>> >>> registry. Our test case looks like the following.
>> >>>
>> >>>   @Test
>> >>>   public void testRollbackAfterEnqueue() throws Exception {
>> >>>     adviceRoute();
>> >>>     startCamelContext();
>> >>>     initMocks();
>> >>>     final RecordList cases = casesA();
>> >>>
>> >>>     mockEnd.expectedMessageCount(2);
>> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>> >>>     mockDead.expectedMessageCount(1);
>> >>>     mockDead.expectedBodiesReceived(cases.get(0));
>> >>>     mockBeforeToQueue.expectedMessageCount(3);
>> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>>     mockOutbox.expectedMessageCount(3);
>> >>>
>> >>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> >>> cases));
>> >>>     mockAfterToQueue.expectedMessageCount(3);
>> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>> >>>
>> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>> >>>
>> >>>     mockBeforeToQueue.assertIsSatisfied();
>> >>>     mockAfterToQueue.assertIsSatisfied();
>> >>>     mockEnd.assertIsSatisfied();
>> >>>     mockDead.assertIsSatisfied();
>> >>>     mockOutbox.assertIsSatisfied();
>> >>>   }
>> >>>
>> >>> In this route the goal is that if any exceptions are thrown even after
>> >>> the message is enqueued, it should be rolled back, the message that
>> >>> excepted should go to the DLQ and the messages in the outbox should be
>> >>> rolled back but the message from the inbox should not be put back on
>> the
>> >>> queue. This is proving to be a bit of a juggling act.
>> >>>
>> >>> I tried putting markRollBackOnly() in the exception handler after the
>> >>> to(dead) but that rolled back the dead letter queue and outbox and then
>> >>> redelivered the inbox message. Removing the markRollbackOnly() means
>> >>> that the message arrives at dead and is off the inbox but it doesn't
>> get
>> >>> removed from the outbox.
>> >>>
>> >>> So I am looking for the right combination of calls to accomplish what I
>> >>> want to do. Any suggestions?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>> >>>
>> >>
>> >>
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> hawtio: http://hawt.io/
>> fabric8: http://fabric8.io/
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by "kraythe ." <kr...@gmail.com>.
Yeah no problem. I was just hoping someone would have the answer. I keep
plugging away at it. Probably some transaction issue, I dont know. You can
answer this perhaps :) , when I send a message to an activemq endpoint with
InOnly exchange pattern, will that message be subject to the transation?
I.e. if the transaction fails will it get popped off AMQ?

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <cl...@gmail.com> wrote:

> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com> wrote:
> > No one has any idea on this? It would be really great if I could find the
> > formula to get this to work.
> >
>
> I dont think people always have the time to help, and especially when
> its more complicated with transactions and a lot of Camel route code.
>
> If you want to get priority help then there is some companies that offer
> that
> http://camel.apache.org/commercial-camel-offerings.html
>
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
> >
> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:
> >
> >> So I think there is a problem with the way rollback is implemented in
> >> relation to Camel. As far as I can tell there is no way to get the
> >> following working.
> >>
> >> package com.ea.wwce.camel.test.utilities;
> >>
> >> import com.ea.wwce.camel.utilities.data.RecordList;
> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
> >> import org.apache.camel.ExchangePattern;
> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
> >> import org.apache.camel.builder.RouteBuilder;
> >> import org.apache.camel.component.mock.MockEndpoint;
> >> import org.testng.annotations.Test;
> >> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
> >> import static
> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
> >> import static
> >>
> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
> >> import static org.apache.camel.ExchangePattern.InOnly;
> >>
> >> /** This test suite validates the transaction configuration in the test
> >> suite. */
> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
> >>   private static final String QUEUE_DEAD = "dead";
> >>   private static final String QUEUE_INBOX = "inbox";
> >>   private static final String QUEUE_OUTBOX = "outbox";
> >>   private static final String ROUTE_ID_FEED = "Feed";
> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
> >>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
> >>
> >>   private static final String MOCK_END = "mock:end";
> >>   private static final String MOCK_BEFORE_TO_QUEUE =
> >> "mock:before_to_queue";
> >>   private static final String MOCK_AFTER_TO_QUEUE =
> "mock:after_to_queue";
> >>
> >>   /** Mock endpoints. */
> >>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
> >> mockAfterToQueue;
> >>
> >>   /** Helper to initialize mocks in the test. */
> >>   private void initMocks() {
> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
> >>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
> >>     mockOutbox =
> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
> >>   }
> >>
> >>   @Override
> >>   protected RouteBuilder createRouteBuilder() {
> >>     System.out.println("createRouteBuilder");
> >>     return new RouteBuilder(this.context) {
> >>       @Override
> >>       public void configure() {
> >>         getContext().setTracing(true);
> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>
> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >>
> >> .onException(RuntimeException.class).handled(true).useOriginalMessage()
> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
> >>             .unmarshal(dfCaseRecord)
> >>             .to(MOCK_BEFORE_TO_QUEUE)
> >>             .marshal(dfCaseRecord)
> >>             .to(endpointAMQ(QUEUE_OUTBOX))
> >>             .unmarshal(dfCaseRecord)
> >>             .to(MOCK_AFTER_TO_QUEUE);
> >>       }
> >>     };
> >>   }
> >>
> >>   /** Advice the route, mocking ActiveMQ endpoints. */
> >>   protected void adviceRoute() throws Exception {
> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
> >>         this.context, new AdviceWithRouteBuilder() {
> >>           @Override
> >>           public void configure() throws Exception {
> >>             mockEndpoints("activemq:queue:*");
> >>           }
> >>         }
> >>     );
> >>   }
> >>
> >>   @Test
> >>   public void testNormalRouting() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceived(cases);
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>   }
> >>
> >>   @Test
> >>   public void testRollbackBeforeEnqueue() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> cases.get(2));
> >>     mockDead.expectedBodiesReceived(cases.get(0));
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases.get(1), cases.get(2)));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> cases.get(2));
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>   }
> >>
> >>   @Test
> >>   public void testRollbackAfterEnqueue() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> cases.get(2));
> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>   }
> >> }
> >>
> >> I have tried dozens of combinations in the onException clause and
> nothing
> >> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
> >> back the dead letter channel as well. Making the handled(false) cause
> AMQ
> >> to resubmit the transaction and rolls back the dead letter channel. I
> have
> >> tried a dozen combinations so if anyone has one that works I would be
> >> grateful.
> >>
> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >>
> >>
> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com> wrote:
> >>
> >>> So, in the ongoing perfect transaction configuration we have an
> >>> interesting use case: Consider the following route in a test:
> >>>
> >>>   @Override
> >>>   protected RouteBuilder createRouteBuilder() {
> >>>     System.out.println("createRouteBuilder");
> >>>     return new RouteBuilder(this.context) {
> >>>       @Override
> >>>       public void configure() {
> >>>         getContext().setTracing(true);
> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>
> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >>>
> >>>
> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord)
> >>>             .to(MOCK_BEFORE_TO_QUEUE)
> >>>             .marshal(dfCaseRecord)
> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
> >>>             .unmarshal(dfCaseRecord)
> >>>             .to(MOCK_AFTER_TO_QUEUE);
> >>>       }
> >>>     };
> >>>   }
> >>>
> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
> >>> up the transaction policy by name as it is just a string key for our
> JNDI
> >>> registry. Our test case looks like the following.
> >>>
> >>>   @Test
> >>>   public void testRollbackAfterEnqueue() throws Exception {
> >>>     adviceRoute();
> >>>     startCamelContext();
> >>>     initMocks();
> >>>     final RecordList cases = casesA();
> >>>
> >>>     mockEnd.expectedMessageCount(2);
> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
> >>>     mockDead.expectedMessageCount(1);
> >>>     mockDead.expectedBodiesReceived(cases.get(0));
> >>>     mockBeforeToQueue.expectedMessageCount(3);
> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>>     mockOutbox.expectedMessageCount(3);
> >>>
> >>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >>> cases));
> >>>     mockAfterToQueue.expectedMessageCount(3);
> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>>
> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>>
> >>>     mockBeforeToQueue.assertIsSatisfied();
> >>>     mockAfterToQueue.assertIsSatisfied();
> >>>     mockEnd.assertIsSatisfied();
> >>>     mockDead.assertIsSatisfied();
> >>>     mockOutbox.assertIsSatisfied();
> >>>   }
> >>>
> >>> In this route the goal is that if any exceptions are thrown even after
> >>> the message is enqueued, it should be rolled back, the message that
> >>> excepted should go to the DLQ and the messages in the outbox should be
> >>> rolled back but the message from the inbox should not be put back on
> the
> >>> queue. This is proving to be a bit of a juggling act.
> >>>
> >>> I tried putting markRollBackOnly() in the exception handler after the
> >>> to(dead) but that rolled back the dead letter queue and outbox and then
> >>> redelivered the inbox message. Removing the markRollbackOnly() means
> >>> that the message arrives at dead and is off the inbox but it doesn't
> get
> >>> removed from the outbox.
> >>>
> >>> So I am looking for the right combination of calls to accomplish what I
> >>> want to do. Any suggestions?
> >>>
> >>> Thanks in advance.
> >>>
> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >>>
> >>
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kr...@gmail.com> wrote:
> No one has any idea on this? It would be really great if I could find the
> formula to get this to work.
>

I dont think people always have the time to help, and especially when
its more complicated with transactions and a lot of Camel route code.

If you want to get priority help then there is some companies that offer that
http://camel.apache.org/commercial-camel-offerings.html

> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:
>
>> So I think there is a problem with the way rollback is implemented in
>> relation to Camel. As far as I can tell there is no way to get the
>> following working.
>>
>> package com.ea.wwce.camel.test.utilities;
>>
>> import com.ea.wwce.camel.utilities.data.RecordList;
>> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
>> import org.apache.camel.ExchangePattern;
>> import org.apache.camel.builder.AdviceWithRouteBuilder;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.testng.annotations.Test;
>> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
>> import static
>> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
>> import static
>> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
>> import static org.apache.camel.ExchangePattern.InOnly;
>>
>> /** This test suite validates the transaction configuration in the test
>> suite. */
>> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
>>   private static final String QUEUE_DEAD = "dead";
>>   private static final String QUEUE_INBOX = "inbox";
>>   private static final String QUEUE_OUTBOX = "outbox";
>>   private static final String ROUTE_ID_FEED = "Feed";
>>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
>>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
>>   private static final String ROUTE_ID_DEAD = "DeadRoute";
>>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
>>
>>   private static final String MOCK_END = "mock:end";
>>   private static final String MOCK_BEFORE_TO_QUEUE =
>> "mock:before_to_queue";
>>   private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue";
>>
>>   /** Mock endpoints. */
>>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
>> mockAfterToQueue;
>>
>>   /** Helper to initialize mocks in the test. */
>>   private void initMocks() {
>>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
>>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
>>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
>>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
>>     mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
>>   }
>>
>>   @Override
>>   protected RouteBuilder createRouteBuilder() {
>>     System.out.println("createRouteBuilder");
>>     return new RouteBuilder(this.context) {
>>       @Override
>>       public void configure() {
>>         getContext().setTracing(true);
>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>
>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage()
>>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
>>             .unmarshal(dfCaseRecord)
>>             .to(MOCK_BEFORE_TO_QUEUE)
>>             .marshal(dfCaseRecord)
>>             .to(endpointAMQ(QUEUE_OUTBOX))
>>             .unmarshal(dfCaseRecord)
>>             .to(MOCK_AFTER_TO_QUEUE);
>>       }
>>     };
>>   }
>>
>>   /** Advice the route, mocking ActiveMQ endpoints. */
>>   protected void adviceRoute() throws Exception {
>>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
>>         this.context, new AdviceWithRouteBuilder() {
>>           @Override
>>           public void configure() throws Exception {
>>             mockEndpoints("activemq:queue:*");
>>           }
>>         }
>>     );
>>   }
>>
>>   @Test
>>   public void testNormalRouting() throws Exception {
>>     adviceRoute();
>>     startCamelContext();
>>     initMocks();
>>     final RecordList cases = casesA();
>>
>>     mockEnd.expectedBodiesReceived(cases);
>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> cases));
>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>
>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>>
>>     mockBeforeToQueue.assertIsSatisfied();
>>     mockAfterToQueue.assertIsSatisfied();
>>     mockEnd.assertIsSatisfied();
>>     mockDead.assertIsSatisfied();
>>     mockOutbox.assertIsSatisfied();
>>   }
>>
>>   @Test
>>   public void testRollbackBeforeEnqueue() throws Exception {
>>     adviceRoute();
>>     startCamelContext();
>>     initMocks();
>>     final RecordList cases = casesA();
>>
>>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
>>     mockDead.expectedBodiesReceived(cases.get(0));
>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> cases.get(1), cases.get(2)));
>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
>> cases.get(2));
>>
>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>>
>>     mockBeforeToQueue.assertIsSatisfied();
>>     mockAfterToQueue.assertIsSatisfied();
>>     mockEnd.assertIsSatisfied();
>>     mockDead.assertIsSatisfied();
>>     mockOutbox.assertIsSatisfied();
>>   }
>>
>>   @Test
>>   public void testRollbackAfterEnqueue() throws Exception {
>>     adviceRoute();
>>     startCamelContext();
>>     initMocks();
>>     final RecordList cases = casesA();
>>
>>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
>>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> cases));
>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>>
>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>>
>>     mockBeforeToQueue.assertIsSatisfied();
>>     mockAfterToQueue.assertIsSatisfied();
>>     mockDead.assertIsSatisfied();
>>     mockOutbox.assertIsSatisfied();
>>     mockEnd.assertIsSatisfied();
>>   }
>> }
>>
>> I have tried dozens of combinations in the onException clause and nothing
>> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
>> back the dead letter channel as well. Making the handled(false) cause AMQ
>> to resubmit the transaction and rolls back the dead letter channel. I have
>> tried a dozen combinations so if anyone has one that works I would be
>> grateful.
>>
>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>>
>>
>> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com> wrote:
>>
>>> So, in the ongoing perfect transaction configuration we have an
>>> interesting use case: Consider the following route in a test:
>>>
>>>   @Override
>>>   protected RouteBuilder createRouteBuilder() {
>>>     System.out.println("createRouteBuilder");
>>>     return new RouteBuilder(this.context) {
>>>       @Override
>>>       public void configure() {
>>>         getContext().setTracing(true);
>>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>>
>>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>>>
>>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>>             .unmarshal(dfCaseRecord)
>>>             .to(MOCK_BEFORE_TO_QUEUE)
>>>             .marshal(dfCaseRecord)
>>>             .to(endpointAMQ(QUEUE_OUTBOX))
>>>             .unmarshal(dfCaseRecord)
>>>             .to(MOCK_AFTER_TO_QUEUE);
>>>       }
>>>     };
>>>   }
>>>
>>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
>>> up the transaction policy by name as it is just a string key for our JNDI
>>> registry. Our test case looks like the following.
>>>
>>>   @Test
>>>   public void testRollbackAfterEnqueue() throws Exception {
>>>     adviceRoute();
>>>     startCamelContext();
>>>     initMocks();
>>>     final RecordList cases = casesA();
>>>
>>>     mockEnd.expectedMessageCount(2);
>>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>>>     mockDead.expectedMessageCount(1);
>>>     mockDead.expectedBodiesReceived(cases.get(0));
>>>     mockBeforeToQueue.expectedMessageCount(3);
>>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>>     mockOutbox.expectedMessageCount(3);
>>>
>>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>>> cases));
>>>     mockAfterToQueue.expectedMessageCount(3);
>>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>>>
>>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>>>
>>>     mockBeforeToQueue.assertIsSatisfied();
>>>     mockAfterToQueue.assertIsSatisfied();
>>>     mockEnd.assertIsSatisfied();
>>>     mockDead.assertIsSatisfied();
>>>     mockOutbox.assertIsSatisfied();
>>>   }
>>>
>>> In this route the goal is that if any exceptions are thrown even after
>>> the message is enqueued, it should be rolled back, the message that
>>> excepted should go to the DLQ and the messages in the outbox should be
>>> rolled back but the message from the inbox should not be put back on the
>>> queue. This is proving to be a bit of a juggling act.
>>>
>>> I tried putting markRollBackOnly() in the exception handler after the
>>> to(dead) but that rolled back the dead letter queue and outbox and then
>>> redelivered the inbox message. Removing the markRollbackOnly() means
>>> that the message arrives at dead and is off the inbox but it doesn't get
>>> removed from the outbox.
>>>
>>> So I am looking for the right combination of calls to accomplish what I
>>> want to do. Any suggestions?
>>>
>>> Thanks in advance.
>>>
>>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>>>
>>
>>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by "kraythe ." <kr...@gmail.com>.
No one has any idea on this? It would be really great if I could find the
formula to get this to work.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kr...@gmail.com> wrote:

> So I think there is a problem with the way rollback is implemented in
> relation to Camel. As far as I can tell there is no way to get the
> following working.
>
> package com.ea.wwce.camel.test.utilities;
>
> import com.ea.wwce.camel.utilities.data.RecordList;
> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.builder.AdviceWithRouteBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.testng.annotations.Test;
> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
> import static
> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
> import static
> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
> import static org.apache.camel.ExchangePattern.InOnly;
>
> /** This test suite validates the transaction configuration in the test
> suite. */
> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
>   private static final String QUEUE_DEAD = "dead";
>   private static final String QUEUE_INBOX = "inbox";
>   private static final String QUEUE_OUTBOX = "outbox";
>   private static final String ROUTE_ID_FEED = "Feed";
>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
>   private static final String ROUTE_ID_DEAD = "DeadRoute";
>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
>
>   private static final String MOCK_END = "mock:end";
>   private static final String MOCK_BEFORE_TO_QUEUE =
> "mock:before_to_queue";
>   private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue";
>
>   /** Mock endpoints. */
>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
> mockAfterToQueue;
>
>   /** Helper to initialize mocks in the test. */
>   private void initMocks() {
>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
>     mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
>   }
>
>   @Override
>   protected RouteBuilder createRouteBuilder() {
>     System.out.println("createRouteBuilder");
>     return new RouteBuilder(this.context) {
>       @Override
>       public void configure() {
>         getContext().setTracing(true);
>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>
> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>             .unmarshal(dfCaseRecord).to(MOCK_END);
>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>
> .onException(RuntimeException.class).handled(true).useOriginalMessage()
>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
>             .unmarshal(dfCaseRecord)
>             .to(MOCK_BEFORE_TO_QUEUE)
>             .marshal(dfCaseRecord)
>             .to(endpointAMQ(QUEUE_OUTBOX))
>             .unmarshal(dfCaseRecord)
>             .to(MOCK_AFTER_TO_QUEUE);
>       }
>     };
>   }
>
>   /** Advice the route, mocking ActiveMQ endpoints. */
>   protected void adviceRoute() throws Exception {
>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
>         this.context, new AdviceWithRouteBuilder() {
>           @Override
>           public void configure() throws Exception {
>             mockEndpoints("activemq:queue:*");
>           }
>         }
>     );
>   }
>
>   @Test
>   public void testNormalRouting() throws Exception {
>     adviceRoute();
>     startCamelContext();
>     initMocks();
>     final RecordList cases = casesA();
>
>     mockEnd.expectedBodiesReceived(cases);
>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> cases));
>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>
>     template.sendBody(DIRECT_FEED_INBOX, cases);
>
>     mockBeforeToQueue.assertIsSatisfied();
>     mockAfterToQueue.assertIsSatisfied();
>     mockEnd.assertIsSatisfied();
>     mockDead.assertIsSatisfied();
>     mockOutbox.assertIsSatisfied();
>   }
>
>   @Test
>   public void testRollbackBeforeEnqueue() throws Exception {
>     adviceRoute();
>     startCamelContext();
>     initMocks();
>     final RecordList cases = casesA();
>
>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
>     mockDead.expectedBodiesReceived(cases.get(0));
>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> cases.get(1), cases.get(2)));
>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
> cases.get(2));
>
>     template.sendBody(DIRECT_FEED_INBOX, cases);
>
>     mockBeforeToQueue.assertIsSatisfied();
>     mockAfterToQueue.assertIsSatisfied();
>     mockEnd.assertIsSatisfied();
>     mockDead.assertIsSatisfied();
>     mockOutbox.assertIsSatisfied();
>   }
>
>   @Test
>   public void testRollbackAfterEnqueue() throws Exception {
>     adviceRoute();
>     startCamelContext();
>     initMocks();
>     final RecordList cases = casesA();
>
>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> cases));
>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>
>     template.sendBody(DIRECT_FEED_INBOX, cases);
>
>     mockBeforeToQueue.assertIsSatisfied();
>     mockAfterToQueue.assertIsSatisfied();
>     mockDead.assertIsSatisfied();
>     mockOutbox.assertIsSatisfied();
>     mockEnd.assertIsSatisfied();
>   }
> }
>
> I have tried dozens of combinations in the onException clause and nothing
> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
> back the dead letter channel as well. Making the handled(false) cause AMQ
> to resubmit the transaction and rolls back the dead letter channel. I have
> tried a dozen combinations so if anyone has one that works I would be
> grateful.
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>
>
> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com> wrote:
>
>> So, in the ongoing perfect transaction configuration we have an
>> interesting use case: Consider the following route in a test:
>>
>>   @Override
>>   protected RouteBuilder createRouteBuilder() {
>>     System.out.println("createRouteBuilder");
>>     return new RouteBuilder(this.context) {
>>       @Override
>>       public void configure() {
>>         getContext().setTracing(true);
>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>
>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>             .unmarshal(dfCaseRecord).to(MOCK_END);
>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>>
>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>>             .unmarshal(dfCaseRecord)
>>             .to(MOCK_BEFORE_TO_QUEUE)
>>             .marshal(dfCaseRecord)
>>             .to(endpointAMQ(QUEUE_OUTBOX))
>>             .unmarshal(dfCaseRecord)
>>             .to(MOCK_AFTER_TO_QUEUE);
>>       }
>>     };
>>   }
>>
>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
>> up the transaction policy by name as it is just a string key for our JNDI
>> registry. Our test case looks like the following.
>>
>>   @Test
>>   public void testRollbackAfterEnqueue() throws Exception {
>>     adviceRoute();
>>     startCamelContext();
>>     initMocks();
>>     final RecordList cases = casesA();
>>
>>     mockEnd.expectedMessageCount(2);
>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>>     mockDead.expectedMessageCount(1);
>>     mockDead.expectedBodiesReceived(cases.get(0));
>>     mockBeforeToQueue.expectedMessageCount(3);
>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>     mockOutbox.expectedMessageCount(3);
>>
>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
>> cases));
>>     mockAfterToQueue.expectedMessageCount(3);
>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>>
>>     template.sendBody(DIRECT_FEED_INBOX, cases);
>>
>>     mockBeforeToQueue.assertIsSatisfied();
>>     mockAfterToQueue.assertIsSatisfied();
>>     mockEnd.assertIsSatisfied();
>>     mockDead.assertIsSatisfied();
>>     mockOutbox.assertIsSatisfied();
>>   }
>>
>> In this route the goal is that if any exceptions are thrown even after
>> the message is enqueued, it should be rolled back, the message that
>> excepted should go to the DLQ and the messages in the outbox should be
>> rolled back but the message from the inbox should not be put back on the
>> queue. This is proving to be a bit of a juggling act.
>>
>> I tried putting markRollBackOnly() in the exception handler after the
>> to(dead) but that rolled back the dead letter queue and outbox and then
>> redelivered the inbox message. Removing the markRollbackOnly() means
>> that the message arrives at dead and is off the inbox but it doesn't get
>> removed from the outbox.
>>
>> So I am looking for the right combination of calls to accomplish what I
>> want to do. Any suggestions?
>>
>> Thanks in advance.
>>
>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>>
>
>

Re: Transactions: Rollback Destination but Not Dead Letter Queue or Source

Posted by "kraythe ." <kr...@gmail.com>.
So I think there is a problem with the way rollback is implemented in
relation to Camel. As far as I can tell there is no way to get the
following working.

package com.ea.wwce.camel.test.utilities;

import com.ea.wwce.camel.utilities.data.RecordList;
import com.ea.wwce.camel.utilities.transactions.TxnHelper;
import org.apache.camel.ExchangePattern;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.testng.annotations.Test;
import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
import static
com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
import static
com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
import static org.apache.camel.ExchangePattern.InOnly;

/** This test suite validates the transaction configuration in the test
suite. */
public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
  private static final String QUEUE_DEAD = "dead";
  private static final String QUEUE_INBOX = "inbox";
  private static final String QUEUE_OUTBOX = "outbox";
  private static final String ROUTE_ID_FEED = "Feed";
  private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
  private static final String ROUTE_ID_RESULTS = "ResultsRoute";
  private static final String ROUTE_ID_DEAD = "DeadRoute";
  private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";

  private static final String MOCK_END = "mock:end";
  private static final String MOCK_BEFORE_TO_QUEUE = "mock:before_to_queue";
  private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue";

  /** Mock endpoints. */
  private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
mockAfterToQueue;

  /** Helper to initialize mocks in the test. */
  private void initMocks() {
    mockEnd = assertAndGetMockEndpoint(MOCK_END);
    mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
    mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
    mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
    mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
  }

  @Override
  protected RouteBuilder createRouteBuilder() {
    System.out.println("createRouteBuilder");
    return new RouteBuilder(this.context) {
      @Override
      public void configure() {
        getContext().setTracing(true);
        from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)

.split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
        from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .unmarshal(dfCaseRecord).to(MOCK_END);
        from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
            .unmarshal(dfCaseRecord).to(MOCK_DEAD);
        from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)

.onException(RuntimeException.class).handled(true).useOriginalMessage()
              .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
            .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
            .unmarshal(dfCaseRecord)
            .to(MOCK_BEFORE_TO_QUEUE)
            .marshal(dfCaseRecord)
            .to(endpointAMQ(QUEUE_OUTBOX))
            .unmarshal(dfCaseRecord)
            .to(MOCK_AFTER_TO_QUEUE);
      }
    };
  }

  /** Advice the route, mocking ActiveMQ endpoints. */
  protected void adviceRoute() throws Exception {
    this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
        this.context, new AdviceWithRouteBuilder() {
          @Override
          public void configure() throws Exception {
            mockEndpoints("activemq:queue:*");
          }
        }
    );
  }

  @Test
  public void testNormalRouting() throws Exception {
    adviceRoute();
    startCamelContext();
    initMocks();
    final RecordList cases = casesA();

    mockEnd.expectedBodiesReceived(cases);
    mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
cases));
    mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);

    template.sendBody(DIRECT_FEED_INBOX, cases);

    mockBeforeToQueue.assertIsSatisfied();
    mockAfterToQueue.assertIsSatisfied();
    mockEnd.assertIsSatisfied();
    mockDead.assertIsSatisfied();
    mockOutbox.assertIsSatisfied();
  }

  @Test
  public void testRollbackBeforeEnqueue() throws Exception {
    adviceRoute();
    startCamelContext();
    initMocks();
    final RecordList cases = casesA();

    mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
    mockDead.expectedBodiesReceived(cases.get(0));
    mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
    mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
cases.get(1), cases.get(2)));
    mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
cases.get(2));

    template.sendBody(DIRECT_FEED_INBOX, cases);

    mockBeforeToQueue.assertIsSatisfied();
    mockAfterToQueue.assertIsSatisfied();
    mockEnd.assertIsSatisfied();
    mockDead.assertIsSatisfied();
    mockOutbox.assertIsSatisfied();
  }

  @Test
  public void testRollbackAfterEnqueue() throws Exception {
    adviceRoute();
    startCamelContext();
    initMocks();
    final RecordList cases = casesA();

    mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2));
    mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
    mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
cases));
    mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
    mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);

    template.sendBody(DIRECT_FEED_INBOX, cases);

    mockBeforeToQueue.assertIsSatisfied();
    mockAfterToQueue.assertIsSatisfied();
    mockDead.assertIsSatisfied();
    mockOutbox.assertIsSatisfied();
    mockEnd.assertIsSatisfied();
  }
}

I have tried dozens of combinations in the onException clause and nothing
works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
back the dead letter channel as well. Making the handled(false) cause AMQ
to resubmit the transaction and rolls back the dead letter channel. I have
tried a dozen combinations so if anyone has one that works I would be
grateful.

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kr...@gmail.com> wrote:

> So, in the ongoing perfect transaction configuration we have an
> interesting use case: Consider the following route in a test:
>
>   @Override
>   protected RouteBuilder createRouteBuilder() {
>     System.out.println("createRouteBuilder");
>     return new RouteBuilder(this.context) {
>       @Override
>       public void configure() {
>         getContext().setTracing(true);
>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>
> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>             .unmarshal(dfCaseRecord).to(MOCK_END);
>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
>
> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
>             .unmarshal(dfCaseRecord)
>             .to(MOCK_BEFORE_TO_QUEUE)
>             .marshal(dfCaseRecord)
>             .to(endpointAMQ(QUEUE_OUTBOX))
>             .unmarshal(dfCaseRecord)
>             .to(MOCK_AFTER_TO_QUEUE);
>       }
>     };
>   }
>
> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
> up the transaction policy by name as it is just a string key for our JNDI
> registry. Our test case looks like the following.
>
>   @Test
>   public void testRollbackAfterEnqueue() throws Exception {
>     adviceRoute();
>     startCamelContext();
>     initMocks();
>     final RecordList cases = casesA();
>
>     mockEnd.expectedMessageCount(2);
>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
>     mockDead.expectedMessageCount(1);
>     mockDead.expectedBodiesReceived(cases.get(0));
>     mockBeforeToQueue.expectedMessageCount(3);
>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
>     mockOutbox.expectedMessageCount(3);
>
> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> cases));
>     mockAfterToQueue.expectedMessageCount(3);
>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
>
>     template.sendBody(DIRECT_FEED_INBOX, cases);
>
>     mockBeforeToQueue.assertIsSatisfied();
>     mockAfterToQueue.assertIsSatisfied();
>     mockEnd.assertIsSatisfied();
>     mockDead.assertIsSatisfied();
>     mockOutbox.assertIsSatisfied();
>   }
>
> In this route the goal is that if any exceptions are thrown even after the
> message is enqueued, it should be rolled back, the message that excepted
> should go to the DLQ and the messages in the outbox should be rolled back
> but the message from the inbox should not be put back on the queue. This is
> proving to be a bit of a juggling act.
>
> I tried putting markRollBackOnly() in the exception handler after the
> to(dead) but that rolled back the dead letter queue and outbox and then
> redelivered the inbox message. Removing the markRollbackOnly() means that
> the message arrives at dead and is off the inbox but it doesn't get removed
> from the outbox.
>
> So I am looking for the right combination of calls to accomplish what I
> want to do. Any suggestions?
>
> Thanks in advance.
>
> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
>