You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by d1x <zd...@gmail.com> on 2016/02/10 11:18:15 UTC

SJMS transaction

Hello,

is there a way how to ensure transactions between consumer-producer of two
JMS queues using SJMS component? 

What I'm testing is this simple case:
1. prepare higher amount of JMS messages in broker (e.g. 1000)
2. have Camel route from input queue to output queue
3. start context (starts consuming messages) and in any time kill java
process

What I would expect is that sum of messages in input queue and output queue
will be 1000. But what happens is that the sum is higher (something between
1000 and 1005 as I use 5 parallelConsumers). It is because when I kill
process in the time when JMS message was already submitted to JMS output
queue - it will remain there even if the transaction (acknowledge) of
message from input queue did not succeed (because of kill). 

I tried hard to find out how to configure Apache Camel properly. From what I
understood from documentation is that I cannot use Camel Transaction
Processor. But still there are options like acknowledgementMode (but their
options seem not documented at all, not even in source code).

So is there a way how to configure SJMS component to behave fully
transactionally between consumer-producer? Very likely they will use the
same JMS session or use other way of synchronization?

Here is the simple code I use for testing

public class SjmsTransaction {

    public static void main(String[] args) throws Exception {
        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
               
from("sjms:queue:test-in?transacted=true&acknowledgementMode=DUPS_OK_ACKNOWLEDGE&consumerCount=5")
//                        .setExchangePattern(ExchangePattern.InOut)
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws
Exception {
                                System.out.println("Processed message " +
exchange.getExchangeId());
                            }
                        })
                        .to("sjms:queue:test-out?transacted=true");
            }
        };

        CamelContext context = new DefaultCamelContext();
        addJmsComponent(context);
        context.addRoutes(rb);

        System.out.println("=====> Starting context");
        context.start();
        // Now the context will run and consume messages, when I stop
application by force in any time
        // I expect this to be true: <#submittedMessages> ==
<#messagesInInput> + <#messagesInOutput>
        // What happens is that there more messages that was not
acknowledged (from input) but are already in output
    }

    private static void addJmsComponent(CamelContext context) {
        ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
        ConnectionFactoryResource connResource = new
ConnectionFactoryResource(5, factory);
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionResource(connResource);
        context.addComponent("sjms", comp);
    }
}

Thank you very much for any answer



--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: SJMS transaction

Posted by d1x <zd...@gmail.com>.
Any ideas or estimations? 
The bug is still untouched :-/



--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522p5779130.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: SJMS transaction

Posted by d1x <zd...@gmail.com>.
Not sure if it is a bug but we need to find out if it has any solution.
So I created a bug to JIRA of Apache Camel:
https://issues.apache.org/jira/browse/CAMEL-9606

Let's see if there will be anybody who can say more to this topic.



--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522p5777787.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: SJMS transaction

Posted by d1x <zd...@gmail.com>.
Yes, you are right.
As far as I know, you are able to do JMS transactions based on single JMS
session (with only once delivery).

So the problem is that I cannot find the way how to configure SJMS to do the
JMS transaction (use same JMS session) for IN-OUT scenario described. And I
must admit it is kinda weird because JMS transaction is one of the most
important constraints.



--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522p5777744.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: SJMS transaction

Posted by Quinn Stevenson <qu...@pronoia-solutions.com>.
It sounds like SJMS may be replaying a transaction.  Either that or the producer and consumer are not using the same JMS Session.  Since you’re not losing messages, I’d guess it’s one of those two scenarios.

I’m not sure if JMS Session Transactions are guaranteed one and only once delivery - a JMS expert would have to answer that one.


> On Feb 11, 2016, at 1:01 AM, d1x <zd...@gmail.com> wrote:
> 
> Thank you for your answer. I use only one broker, so I don't need that kind
> of XA transaction as you pointed out.
> Anyway setting transacted=true on both consumer and producer does not solve
> the issue as there are always more than 1000 messages in input+output queue
> even there were 1000 messages in input at the beginning.
> 
> E.g.
> from("sjms:queue:test-in?transacted=true&consumerCount=5")
>                     .process(systemOut("Processing"))
>                     .to("sjms:queue:test-out?transacted=true")
> 
> You can try yourself with that simple one-class code...
> 
> 
> 
> 
> --
> View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522p5777570.html
> Sent from the Camel - Users mailing list archive at Nabble.com.


Re: SJMS transaction

Posted by d1x <zd...@gmail.com>.
Thank you for your answer. I use only one broker, so I don't need that kind
of XA transaction as you pointed out.
Anyway setting transacted=true on both consumer and producer does not solve
the issue as there are always more than 1000 messages in input+output queue
even there were 1000 messages in input at the beginning.

E.g.
from("sjms:queue:test-in?transacted=true&consumerCount=5")
                     .process(systemOut("Processing"))
                     .to("sjms:queue:test-out?transacted=true")

You can try yourself with that simple one-class code...




--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522p5777570.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: SJMS transaction

Posted by Quinn Stevenson <qu...@pronoia-solutions.com>.
I don’t think you want DUPS_OK_ACKNOWLEDGE and transacted both set.

As long as both queues are in the same ActiveMQ broker, all you should need is the “transacted=true” URI option for both the consumer and producer.

If you have multiple ActiveMQ brokers involved, you’ll need to use camel-jms - camel-sjms doesn’t support XA.


> On Feb 10, 2016, at 3:18 AM, d1x <zd...@gmail.com> wrote:
> 
> Hello,
> 
> is there a way how to ensure transactions between consumer-producer of two
> JMS queues using SJMS component? 
> 
> What I'm testing is this simple case:
> 1. prepare higher amount of JMS messages in broker (e.g. 1000)
> 2. have Camel route from input queue to output queue
> 3. start context (starts consuming messages) and in any time kill java
> process
> 
> What I would expect is that sum of messages in input queue and output queue
> will be 1000. But what happens is that the sum is higher (something between
> 1000 and 1005 as I use 5 parallelConsumers). It is because when I kill
> process in the time when JMS message was already submitted to JMS output
> queue - it will remain there even if the transaction (acknowledge) of
> message from input queue did not succeed (because of kill). 
> 
> I tried hard to find out how to configure Apache Camel properly. From what I
> understood from documentation is that I cannot use Camel Transaction
> Processor. But still there are options like acknowledgementMode (but their
> options seem not documented at all, not even in source code).
> 
> So is there a way how to configure SJMS component to behave fully
> transactionally between consumer-producer? Very likely they will use the
> same JMS session or use other way of synchronization?
> 
> Here is the simple code I use for testing
> 
> public class SjmsTransaction {
> 
>    public static void main(String[] args) throws Exception {
>        RouteBuilder rb = new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
> 
> from("sjms:queue:test-in?transacted=true&acknowledgementMode=DUPS_OK_ACKNOWLEDGE&consumerCount=5")
> //                        .setExchangePattern(ExchangePattern.InOut)
>                        .process(new Processor() {
>                            @Override
>                            public void process(Exchange exchange) throws
> Exception {
>                                System.out.println("Processed message " +
> exchange.getExchangeId());
>                            }
>                        })
>                        .to("sjms:queue:test-out?transacted=true");
>            }
>        };
> 
>        CamelContext context = new DefaultCamelContext();
>        addJmsComponent(context);
>        context.addRoutes(rb);
> 
>        System.out.println("=====> Starting context");
>        context.start();
>        // Now the context will run and consume messages, when I stop
> application by force in any time
>        // I expect this to be true: <#submittedMessages> ==
> <#messagesInInput> + <#messagesInOutput>
>        // What happens is that there more messages that was not
> acknowledged (from input) but are already in output
>    }
> 
>    private static void addJmsComponent(CamelContext context) {
>        ConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
>        ConnectionFactoryResource connResource = new
> ConnectionFactoryResource(5, factory);
>        SjmsComponent comp = new SjmsComponent();
>        comp.setConnectionResource(connResource);
>        context.addComponent("sjms", comp);
>    }
> }
> 
> Thank you very much for any answer
> 
> 
> 
> --
> View this message in context: http://camel.465427.n5.nabble.com/SJMS-transaction-tp5777522.html
> Sent from the Camel - Users mailing list archive at Nabble.com.