You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by edvicif <ed...@gmail.com> on 2012/02/21 12:41:34 UTC

Synchronizing event with asynchronous route

Hi

I have a synchronization issue, which I wasn't able to sort out with core
camel components. I was wondering someone has a better solution.

I had a synchronous route:
<when>
  <condition>daily close message</condition>
  <to uri="direct:doAggregation/>
</when>
<otherwise>
  <to uri="direct:storeMessages">
</otherwise>

Storing messages, happens all day. The order of the store messages doesn't
count, but the close message does. We shouldn't begin to process it, untill
all messages are stored. We shouldn't store new messages, untill aggregation
happen. 

The service began to receive large inflow and we decided to make storing
parallel.

The route reads a durrable. So we wasn't able to consume asynchronusly. What
I came up with.

<when>
  <condition>daily close message</condition>
  <to uri="direct:doAggregation/>
</when>
<otherwise>
  <to uri="seda:storeMessages">
</otherwise>

But now I have a race condition. If the daily report arrives, there could be
messages still under processing. Earlier it didn't existed a the same thread
was responsible for both scenario. So the report should wait untill all
message got consumed.

I've implemented a bean for synchronizing. It has a method, where workers
registers them self and another where they unregister. A third one, where
the aggregator, checks is there registered workers and if there is, it waits
untill all worker becomes unregistered. (Maybe a processor would be more
better, but I'm not really confident with the solution). 

So now the solution looks like this:

<when>
  <condition>daily close message</condition>
  <bean id="workerSynch" methodName="waitForOutStandingWork"/>
  <to uri="direct:doAggregation/>
</when>
<otherwise>
    <doTry>
      <bean id="workerSynch" methodName="registerWork"/>
    </doTry>
    <to uri="seda:myWork"/>
</otherwise>

<from uri="seda:myWork"/>
<doTry>
  <to uri="direct:storeMessages/>
</doTry>
<doFinally>
  <bean id="workerSynch" methodName="unRegisterWork"/>
</doFinally>

But I'm not satisfied with this. I'm worried about that the synchronization
spread through route definitions and later someone breaks it.

I'm not 100% sure that this is bullett proof. Like what happens that <to
uri="seda:myWork"/> throws excpetion. Than work stays unregistered.

I think the whole solution for this is to actually implement a new
requirement. Like previously we say process each messages one by one, but
now we have a new requirement, that process messages unless it is a close
down messages. Than the close down message should receive some sort of
business logic, where it can verify it self and act accordingly.

But I'd like to ask, that is there a way to sort this on function level,
like don't process untill other route work queue is empty instead
implementing a new functionality?

--
View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5501946.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Synchronizing event with asynchronous route

Posted by edvicif <ed...@gmail.com>.
You mean 
http://camel.apache.org/maven/camel-2.7.0/camel-core/apidocs/org/apache/camel/impl/DefaultInflightRepository.html
DefaultInflightRepository , what used at durring shutdown?
I'll take a look.

--
View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5504805.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Synchronizing event with asynchronous route

Posted by Claus Ibsen <cl...@gmail.com>.
Camel has a in-flight registry which you can query number of
in-progress messages.
Maybe that can help you, to avoid using sync locks.



On Tue, Feb 21, 2012 at 12:41 PM, edvicif <ed...@gmail.com> wrote:
> Hi
>
> I have a synchronization issue, which I wasn't able to sort out with core
> camel components. I was wondering someone has a better solution.
>
> I had a synchronous route:
> <when>
>  <condition>daily close message</condition>
>  <to uri="direct:doAggregation/>
> </when>
> <otherwise>
>  <to uri="direct:storeMessages">
> </otherwise>
>
> Storing messages, happens all day. The order of the store messages doesn't
> count, but the close message does. We shouldn't begin to process it, untill
> all messages are stored. We shouldn't store new messages, untill aggregation
> happen.
>
> The service began to receive large inflow and we decided to make storing
> parallel.
>
> The route reads a durrable. So we wasn't able to consume asynchronusly. What
> I came up with.
>
> <when>
>  <condition>daily close message</condition>
>  <to uri="direct:doAggregation/>
> </when>
> <otherwise>
>  <to uri="seda:storeMessages">
> </otherwise>
>
> But now I have a race condition. If the daily report arrives, there could be
> messages still under processing. Earlier it didn't existed a the same thread
> was responsible for both scenario. So the report should wait untill all
> message got consumed.
>
> I've implemented a bean for synchronizing. It has a method, where workers
> registers them self and another where they unregister. A third one, where
> the aggregator, checks is there registered workers and if there is, it waits
> untill all worker becomes unregistered. (Maybe a processor would be more
> better, but I'm not really confident with the solution).
>
> So now the solution looks like this:
>
> <when>
>  <condition>daily close message</condition>
>  <bean id="workerSynch" methodName="waitForOutStandingWork"/>
>  <to uri="direct:doAggregation/>
> </when>
> <otherwise>
>    <doTry>
>      <bean id="workerSynch" methodName="registerWork"/>
>    </doTry>
>    <to uri="seda:myWork"/>
> </otherwise>
>
> <from uri="seda:myWork"/>
> <doTry>
>  <to uri="direct:storeMessages/>
> </doTry>
> <doFinally>
>  <bean id="workerSynch" methodName="unRegisterWork"/>
> </doFinally>
>
> But I'm not satisfied with this. I'm worried about that the synchronization
> spread through route definitions and later someone breaks it.
>
> I'm not 100% sure that this is bullett proof. Like what happens that <to
> uri="seda:myWork"/> throws excpetion. Than work stays unregistered.
>
> I think the whole solution for this is to actually implement a new
> requirement. Like previously we say process each messages one by one, but
> now we have a new requirement, that process messages unless it is a close
> down messages. Than the close down message should receive some sort of
> business logic, where it can verify it self and act accordingly.
>
> But I'd like to ask, that is there a way to sort this on function level,
> like don't process untill other route work queue is empty instead
> implementing a new functionality?
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Synchronizing-event-with-asynchronous-route-tp5501946p5501946.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/