You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Hervé BARRAULT <he...@gmail.com> on 2012/02/29 09:45:57 UTC

Mixed Messages and Resequencer

Hi,

I would order messages in a route
I think the resequencer is the right EIP for this but i have some
difficulties to see how to apply it in my case.

Classical Behavior :
input : M[1] M[3] M[2] M[4] , the output will be M[1] M[2] M[3] M[4].

In my case i will have the following entry :

M[A,1] M[B,1] M[B,2] M[A,4] M[A,5] M[B,4] M[A,3] M[B,3] M[A,2], M[B,5]

I would have at the end :
M[A,1] M[A,2] M[A,3] M[A,4] M[A,5]
M[B,1] M[B,2] M[B,3] M[B,4] M[B,5]

We can define one route which separate "A" and "B" messages [basic message
router].

Then two routes (one for "A", one for "B" which do the resequencing [basic
resequencer] ).

If A and B are defined before runtime it's simple.
If A and B are not fixed but is a dynamic list, i can't see how to
implement this.

Is there a way to have this behavior ?

Thanks for answers

Regards

Re: Mixed Messages and Resequencer

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi, what is the better way to create a MultiStreamResequencer ?

I would like to :
    Replace the ResequencerEngine<Exchange> engine by a map (indexed by
stream) [also Delivery delivery].
    Add an Expression to choose the right Resequencer engine.
    Add a way to close a resequencer at the end of the stream.

It could be seen as managing "a JMSXGroupId" in resequencer.

Regards
Hervé

Re: Mixed Messages and Resequencer

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,
I think using camel like this :

Define a standard route (which uses a "dynamic" content based router).

from("cxf:myEndpoint").process(myProcessor).to(header("target").toString());

header("target").toString() => seda:<<streamType>>

And a template which i should instantiate for each new <<stream type>> such
as

from("seda:<<streamType>>").stream().capacity(5000).timeout(500L).process(myProcessor2);

Is it a good way of using camel for the resequencing of multiple streams ?.

I know that i can loose my messages until i manage it in myProcessor2.

Regards
Hervé


On Wed, Feb 29, 2012 at 1:18 PM, Hervé BARRAULT <he...@gmail.com>wrote:

> Hi,
>
> thanks for answer.
> the idea was to use a stream resequencer. In main case messages will be in
> right order but sometimes i could have an inversion so with stream
> resequencer i will not block these messages.
>
> But i would resequence n different streams as I have no logic order
> between A and B and ...
>
> So i would define n stream resequencers [to avoid blocking resequencing of
> all stream if one has a gap].
>
> But i don't see if the resequencer is adapted for this particular case.
>
> I could perhaps implement a complex comparator but i don't think being
> able to handle n independent streams correctly.
>
> Regards
> Hervé
>
>
> On Wed, Feb 29, 2012 at 11:23 AM, Claus Ibsen <cl...@gmail.com>wrote:
>
>> Hi
>>
>> The resequencer EIP works in 2 modes
>> - a) batch
>> - b) streaming
>>
>> Ad a)
>> uses a java.util.Comparator to compare to know in which order the
>> messages should be.
>>
>> You can configure such a comparator using something a like
>> org.apache.camel.util.ExpressionComparator, which shows you how
>> to grab the [A,1] from the message, and then you can do custom java
>> code how to compare that. So you can code that [A,1] comes before
>> [B,1] etc.
>>
>> Then you ought to be able to resequence in a mix with [A,1] and [B,1] etc.
>>
>> The batch mode will emit messages in batches after either a timeout,
>> or a certain size has been hit.
>>
>> Ad b)
>> The streaming mode runs in a continues stream, but it requires that
>> there is *no* gaps. And by default it only offers a Long based
>> comparator.
>> You will have to implement your custom comparator which is a:
>> org.apache.camel.processor.resequencer.ExpressionResultComparator
>>
>> And then you need to implement logic to tell which is the prev / next
>> exepected message, so the resequencer knows if it got the messages,
>> so there is no gaps, and then can emit the messages.
>>
>>
>>
>> Mind that the resequencer is purely memory based, it does not offer a
>> persistent store, and cannot remember its state. So if you restart or
>> server crashes, the messages will be lost.
>>
>>
>>
>>
>>
>> On Wed, Feb 29, 2012 at 9:45 AM, Hervé BARRAULT
>> <he...@gmail.com> wrote:
>> > Hi,
>> >
>> > I would order messages in a route
>> > I think the resequencer is the right EIP for this but i have some
>> > difficulties to see how to apply it in my case.
>> >
>> > Classical Behavior :
>> > input : M[1] M[3] M[2] M[4] , the output will be M[1] M[2] M[3] M[4].
>> >
>> > In my case i will have the following entry :
>> >
>> > M[A,1] M[B,1] M[B,2] M[A,4] M[A,5] M[B,4] M[A,3] M[B,3] M[A,2], M[B,5]
>> >
>> > I would have at the end :
>> > M[A,1] M[A,2] M[A,3] M[A,4] M[A,5]
>> > M[B,1] M[B,2] M[B,3] M[B,4] M[B,5]
>> >
>> > We can define one route which separate "A" and "B" messages [basic
>> message
>> > router].
>> >
>> > Then two routes (one for "A", one for "B" which do the resequencing
>> [basic
>> > resequencer] ).
>> >
>> > If A and B are defined before runtime it's simple.
>> > If A and B are not fixed but is a dynamic list, i can't see how to
>> > implement this.
>> >
>> > Is there a way to have this behavior ?
>> >
>> > Thanks for answers
>> >
>> > Regards
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> FuseSource
>> Email: cibsen@fusesource.com
>> Web: http://fusesource.com
>> Twitter: davsclaus, fusenews
>> Blog: http://davsclaus.blogspot.com/
>> Author of Camel in Action: http://www.manning.com/ibsen/
>>
>
>

Re: Mixed Messages and Resequencer

Posted by Hervé BARRAULT <he...@gmail.com>.
Hi,

thanks for answer.
the idea was to use a stream resequencer. In main case messages will be in
right order but sometimes i could have an inversion so with stream
resequencer i will not block these messages.

But i would resequence n different streams as I have no logic order between
A and B and ...

So i would define n stream resequencers [to avoid blocking resequencing of
all stream if one has a gap].

But i don't see if the resequencer is adapted for this particular case.

I could perhaps implement a complex comparator but i don't think being able
to handle n independent streams correctly.

Regards
Hervé

On Wed, Feb 29, 2012 at 11:23 AM, Claus Ibsen <cl...@gmail.com> wrote:

> Hi
>
> The resequencer EIP works in 2 modes
> - a) batch
> - b) streaming
>
> Ad a)
> uses a java.util.Comparator to compare to know in which order the
> messages should be.
>
> You can configure such a comparator using something a like
> org.apache.camel.util.ExpressionComparator, which shows you how
> to grab the [A,1] from the message, and then you can do custom java
> code how to compare that. So you can code that [A,1] comes before
> [B,1] etc.
>
> Then you ought to be able to resequence in a mix with [A,1] and [B,1] etc.
>
> The batch mode will emit messages in batches after either a timeout,
> or a certain size has been hit.
>
> Ad b)
> The streaming mode runs in a continues stream, but it requires that
> there is *no* gaps. And by default it only offers a Long based
> comparator.
> You will have to implement your custom comparator which is a:
> org.apache.camel.processor.resequencer.ExpressionResultComparator
>
> And then you need to implement logic to tell which is the prev / next
> exepected message, so the resequencer knows if it got the messages,
> so there is no gaps, and then can emit the messages.
>
>
>
> Mind that the resequencer is purely memory based, it does not offer a
> persistent store, and cannot remember its state. So if you restart or
> server crashes, the messages will be lost.
>
>
>
>
>
> On Wed, Feb 29, 2012 at 9:45 AM, Hervé BARRAULT
> <he...@gmail.com> wrote:
> > Hi,
> >
> > I would order messages in a route
> > I think the resequencer is the right EIP for this but i have some
> > difficulties to see how to apply it in my case.
> >
> > Classical Behavior :
> > input : M[1] M[3] M[2] M[4] , the output will be M[1] M[2] M[3] M[4].
> >
> > In my case i will have the following entry :
> >
> > M[A,1] M[B,1] M[B,2] M[A,4] M[A,5] M[B,4] M[A,3] M[B,3] M[A,2], M[B,5]
> >
> > I would have at the end :
> > M[A,1] M[A,2] M[A,3] M[A,4] M[A,5]
> > M[B,1] M[B,2] M[B,3] M[B,4] M[B,5]
> >
> > We can define one route which separate "A" and "B" messages [basic
> message
> > router].
> >
> > Then two routes (one for "A", one for "B" which do the resequencing
> [basic
> > resequencer] ).
> >
> > If A and B are defined before runtime it's simple.
> > If A and B are not fixed but is a dynamic list, i can't see how to
> > implement this.
> >
> > Is there a way to have this behavior ?
> >
> > Thanks for answers
> >
> > Regards
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Re: Mixed Messages and Resequencer

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

The resequencer EIP works in 2 modes
- a) batch
- b) streaming

Ad a)
uses a java.util.Comparator to compare to know in which order the
messages should be.

You can configure such a comparator using something a like
org.apache.camel.util.ExpressionComparator, which shows you how
to grab the [A,1] from the message, and then you can do custom java
code how to compare that. So you can code that [A,1] comes before
[B,1] etc.

Then you ought to be able to resequence in a mix with [A,1] and [B,1] etc.

The batch mode will emit messages in batches after either a timeout,
or a certain size has been hit.

Ad b)
The streaming mode runs in a continues stream, but it requires that
there is *no* gaps. And by default it only offers a Long based
comparator.
You will have to implement your custom comparator which is a:
org.apache.camel.processor.resequencer.ExpressionResultComparator

And then you need to implement logic to tell which is the prev / next
exepected message, so the resequencer knows if it got the messages,
so there is no gaps, and then can emit the messages.



Mind that the resequencer is purely memory based, it does not offer a
persistent store, and cannot remember its state. So if you restart or
server crashes, the messages will be lost.





On Wed, Feb 29, 2012 at 9:45 AM, Hervé BARRAULT
<he...@gmail.com> wrote:
> Hi,
>
> I would order messages in a route
> I think the resequencer is the right EIP for this but i have some
> difficulties to see how to apply it in my case.
>
> Classical Behavior :
> input : M[1] M[3] M[2] M[4] , the output will be M[1] M[2] M[3] M[4].
>
> In my case i will have the following entry :
>
> M[A,1] M[B,1] M[B,2] M[A,4] M[A,5] M[B,4] M[A,3] M[B,3] M[A,2], M[B,5]
>
> I would have at the end :
> M[A,1] M[A,2] M[A,3] M[A,4] M[A,5]
> M[B,1] M[B,2] M[B,3] M[B,4] M[B,5]
>
> We can define one route which separate "A" and "B" messages [basic message
> router].
>
> Then two routes (one for "A", one for "B" which do the resequencing [basic
> resequencer] ).
>
> If A and B are defined before runtime it's simple.
> If A and B are not fixed but is a dynamic list, i can't see how to
> implement this.
>
> Is there a way to have this behavior ?
>
> Thanks for answers
>
> Regards



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/