You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Artur Jablonski <aj...@ravenpack.com> on 2017/03/31 15:08:14 UTC

Curious routing case

Hello.

I wonder if someone could push me in the right direction trying to express
quite curious case in Camel route.

Imagine there's a stream of messages some of which can be processed in
parallel and some have to be processed serially. You can group the messages
that require serial processing together by looking at the message body. You
don't know upfront how many groups can occur in the stream.

The way I thought about doing this is having a route for each message
group. Since I don't know upfront how many and what groups there will be
then I would need to create routes dynamically. If a message comes
belonging to a group that doesn't have it's handling route, then i could
create it (is that even possible??) Then if there's no messages coming for
a given group in some time I could remove the route for the group to
cleanup (is that possible?)

New to Camel

Thx!
Artur

Re: Curious routing case

Posted by Henrique Viecili <vi...@gmail.com>.
Hi Artur,

As your group set is unbounded, you may need to look at creating
(in-memory) queues and routes at runtime and attaching them to a camel
context (or spawning their own context). You can use some camel trickery
with DefaultCamelContext + RouteDefinition, it's ugly but it should work.

Regards,
Henrique



Henrique Viecili

On 12 April 2017 at 01:06, Claus Ibsen <cl...@gmail.com> wrote:

> If you use ActiveMQ you can use its message grouping support
> http://activemq.apache.org/message-groups.html
>
> And Apache Kafka has topics where you can use partition ids to group
> by A, B, C and then have parallel processing of each group.
>
> Not sure SQS has such advanced features.
>
> Its a bit brittle to have to write java code or split / aggregate eip
> patterns in camel to do this, and if you want to have some level of
> transaction guarantees. All EIP patterns in camel are transient and
> dont persist their state etc.
>
> in other words try to use a better messaging system.
>
>
> On Tue, Apr 11, 2017 at 3:31 PM, Artur Jablonski
> <aj...@ravenpack.com> wrote:
> > I guess one thing that come to my mind is to hide all this parallel stuff
> > inside a processor, that would just spit out on the other end the result
> of
> > processing all those messages. It would handle grouping and serializing
> and
> > stuff I guess that would reduce the complexity of the route with a cost
> of
> > complexity of the processor. I have no better ideas anyway, so I will
> give
> > it a go
> >
> > On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <
> ajablonski@ravenpack.com>
> > wrote:
> >
> >> Hello,
> >>
> >> I don't think this route definition is fitting my use case, though I
> >> learnt a thing or two about the interesting patterns linked. Thanks!
> >>
> >> Ok, so let me try to clarify the use case.
> >>
> >>
> >> 1. The stream is infinite, it's not a batch job. The messages keep on
> >> coming from SQS 'all the time'
> >>
> >> 2. More important thing is about parallel processing.
> >>
> >> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.
> >>
> >> Let's say this is the order in which the messages happen to appear
> >>  in the route from SQS
> >>
> >> A1, A2, B1, C1, B2, A3, C2, B3
> >>
> >> Now what I am trying to achieve is grouping the messages that have to be
> >> processed sequentially (order doesn't matter as long as no two messages
> >> from the same group are processed at the same time).
> >> So I am trying to somehow get these streams
> >>
> >> A1, A2, A3
> >>
> >> B1, B2, B3
> >>
> >> C1, C2
> >>
> >>
> >> So, A1 B1 and C1 can be processed in parallel because they are from
> >> different groups, but the messages within groups need to be processed
> one
> >> by one.
> >>
> >> In my example, there are 3 groups, but there can be many and I don't
> know
> >> what they are in advance. The processing logic between the groups is
> >> similar and is a function of the group so I can get a processor for
> group A
> >> from a method call getProcessor(A), B getProcessor(B), etc.
> >>
> >> I am stuck at how to do that in Camel, because since I don't know the
> >> groups in advance, I would need to create processing routes dynamically.
> >>
> >> Say the system starts, and A1 arrives, there can't be any processor for
> >> group A yet, since it's the first message from the group and I need to
> >> somehow dynamically add processing capability of the group A to the
> route
> >> and then perhaps if the messages from group A stop arriving for some
> time,
> >> that processor could be removed.
> >>
> >> How to add the parallel part between the group messages is also blurry
> to
> >> me. One way of doing this I was thinking was to do a multicast to all
> the
> >> dynamically created processing routes for groups and stick a filter
> before
> >> so that only messages from particular group can go through. From
> multicast
> >> page:
> >>
> >> from("direct:a").multicast().parallelProcessing().to("direct:x",
> >> "direct:y", "direct:z");
> >>
> >> But here the x,y,z endpoints are hardcoded. I could write up some custom
> >> multicast I suppose to search the routes in CamelContext...... not sure.
> >>
> >> Thanks
> >> Artur
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <
> ajablonski@ravenpack.com>
> >> wrote:
> >>
> >>> Hi Zoran,
> >>>
> >>> Thank you for such detailed response. This looks very promising. i will
> >>> need to get my head around the aggregator pattern.
> >>> For this week I will be busy with other tasks, but I will get back to
> it
> >>> as soon as I can to see if I can get Camel work for the use case.
> >>>
> >>> Cheerio
> >>> Artur
> >>>
> >>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com>
> wrote:
> >>>
> >>>> Hi Artur,
> >>>> I was thinking that the order of the messages would be important as
> >>>> you need to process them sequentially.
> >>>>
> >>>> So I think you could use the dynamic message routing[1] with
> >>>> aggregator[2], something like:
> >>>>
> >>>>     from("aws-sqs:...")
> >>>>         .process("#preProcess")
> >>>>         .toD("direct:${header.nextRoute}");
> >>>>
> >>>>     from("direct:parallel")...;
> >>>>     from("direct:sequential").aggregate(simple("${header.group}"
> >>>> )).completion..;
> >>>>
> >>>> So from yout SQS queue you would use a processor to pre-process
> >>>> message whose responsibility would be to set the (custom) `nextRoute`
> >>>> and (custom) `group` headers. `nextRoute` would be `parallel` or
> >>>> `sequential`, and if `sequential` the messages would be aggregated
> >>>> using the `group` header.
> >>>>
> >>>> You would want to define your own custom aggregation strategy or use
> >>>> the completion* options that are available to you. There also might be
> >>>> need to use seda[3] to fine tune any parallel processing. You might
> >>>> throw in there a data format unmarshaller[4] instead of the
> >>>> `preProcess` processor and use something like `${body.xyz} == foo` in
> >>>> the `toD` expression.
> >>>>
> >>>> And I would guess that you need to examine transactions or persistence
> >>>> at some point also in case your aggregation step runs for a long time
> >>>> or if your use case is sensitive to message loss if interrupted --
> >>>> which would undoubtedly lead you back to using queues to separate
> >>>> those two ways of processing,
> >>>>
> >>>> HTH,
> >>>>
> >>>> zoran
> >>>>
> >>>> [1] https://camel.apache.org/message-endpoint.html
> >>>> [2] https://camel.apache.org/aggregator2.html
> >>>> [3] https://camel.apache.org/seda.html
> >>>> [4] https://camel.apache.org/data-format.html
> >>>>
> >>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
> >>>> <aj...@ravenpack.com> wrote:
> >>>> > Hey Zoran.
> >>>> >
> >>>> > I read again the patterns you mentioned. In my use case the order of
> >>>> > processing within a group doesn't matter as long as two messages
> from
> >>>> the
> >>>> > same group are never processed in parallel. So i guess resenquencer
> is
> >>>> out
> >>>> > of the picture unless I didn't get the intention.
> >>>> >
> >>>> > So what we are left with is the content based router. Sure. The
> message
> >>>> > comes, i can see what group it belongs two... And what next? Perhaps
> >>>> it's
> >>>> > the very first message from that group so I would need to trigger
> >>>> creating
> >>>> > route/processor for that group somehow, perhaps messages from this
> >>>> group
> >>>> > were processed before in which case the processor for the group
> should
> >>>> > already exist...
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com>
> wrote:
> >>>> >
> >>>> >> Hi Artur,
> >>>> >> have a look at Camel EIP page[1], what you describe sounds to me
> like
> >>>> >> Resequencer and Content based router patterns,
> >>>> >>
> >>>> >> zoran
> >>>> >>
> >>>> >> [1] https://camel.apache.org/eip.html
> >>>> >>
> >>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
> >>>> >> <aj...@ravenpack.com> wrote:
> >>>> >> > Hello.
> >>>> >> >
> >>>> >> > I wonder if someone could push me in the right direction trying
> to
> >>>> >> express
> >>>> >> > quite curious case in Camel route.
> >>>> >> >
> >>>> >> > Imagine there's a stream of messages some of which can be
> processed
> >>>> in
> >>>> >> > parallel and some have to be processed serially. You can group
> the
> >>>> >> messages
> >>>> >> > that require serial processing together by looking at the message
> >>>> body.
> >>>> >> You
> >>>> >> > don't know upfront how many groups can occur in the stream.
> >>>> >> >
> >>>> >> > The way I thought about doing this is having a route for each
> >>>> message
> >>>> >> > group. Since I don't know upfront how many and what groups there
> >>>> will be
> >>>> >> > then I would need to create routes dynamically. If a message
> comes
> >>>> >> > belonging to a group that doesn't have it's handling route, then
> i
> >>>> could
> >>>> >> > create it (is that even possible??) Then if there's no messages
> >>>> coming
> >>>> >> for
> >>>> >> > a given group in some time I could remove the route for the
> group to
> >>>> >> > cleanup (is that possible?)
> >>>> >> >
> >>>> >> > New to Camel
> >>>> >> >
> >>>> >> > Thx!
> >>>> >> > Artur
> >>>> >>
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> Zoran Regvart
> >>>> >>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> Zoran Regvart
> >>>>
> >>>
> >>>
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Re: Curious routing case

Posted by Claus Ibsen <cl...@gmail.com>.
If you use ActiveMQ you can use its message grouping support
http://activemq.apache.org/message-groups.html

And Apache Kafka has topics where you can use partition ids to group
by A, B, C and then have parallel processing of each group.

Not sure SQS has such advanced features.

Its a bit brittle to have to write java code or split / aggregate eip
patterns in camel to do this, and if you want to have some level of
transaction guarantees. All EIP patterns in camel are transient and
dont persist their state etc.

in other words try to use a better messaging system.


On Tue, Apr 11, 2017 at 3:31 PM, Artur Jablonski
<aj...@ravenpack.com> wrote:
> I guess one thing that come to my mind is to hide all this parallel stuff
> inside a processor, that would just spit out on the other end the result of
> processing all those messages. It would handle grouping and serializing and
> stuff I guess that would reduce the complexity of the route with a cost of
> complexity of the processor. I have no better ideas anyway, so I will give
> it a go
>
> On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <aj...@ravenpack.com>
> wrote:
>
>> Hello,
>>
>> I don't think this route definition is fitting my use case, though I
>> learnt a thing or two about the interesting patterns linked. Thanks!
>>
>> Ok, so let me try to clarify the use case.
>>
>>
>> 1. The stream is infinite, it's not a batch job. The messages keep on
>> coming from SQS 'all the time'
>>
>> 2. More important thing is about parallel processing.
>>
>> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.
>>
>> Let's say this is the order in which the messages happen to appear
>>  in the route from SQS
>>
>> A1, A2, B1, C1, B2, A3, C2, B3
>>
>> Now what I am trying to achieve is grouping the messages that have to be
>> processed sequentially (order doesn't matter as long as no two messages
>> from the same group are processed at the same time).
>> So I am trying to somehow get these streams
>>
>> A1, A2, A3
>>
>> B1, B2, B3
>>
>> C1, C2
>>
>>
>> So, A1 B1 and C1 can be processed in parallel because they are from
>> different groups, but the messages within groups need to be processed one
>> by one.
>>
>> In my example, there are 3 groups, but there can be many and I don't know
>> what they are in advance. The processing logic between the groups is
>> similar and is a function of the group so I can get a processor for group A
>> from a method call getProcessor(A), B getProcessor(B), etc.
>>
>> I am stuck at how to do that in Camel, because since I don't know the
>> groups in advance, I would need to create processing routes dynamically.
>>
>> Say the system starts, and A1 arrives, there can't be any processor for
>> group A yet, since it's the first message from the group and I need to
>> somehow dynamically add processing capability of the group A to the route
>> and then perhaps if the messages from group A stop arriving for some time,
>> that processor could be removed.
>>
>> How to add the parallel part between the group messages is also blurry to
>> me. One way of doing this I was thinking was to do a multicast to all the
>> dynamically created processing routes for groups and stick a filter before
>> so that only messages from particular group can go through. From multicast
>> page:
>>
>> from("direct:a").multicast().parallelProcessing().to("direct:x",
>> "direct:y", "direct:z");
>>
>> But here the x,y,z endpoints are hardcoded. I could write up some custom
>> multicast I suppose to search the routes in CamelContext...... not sure.
>>
>> Thanks
>> Artur
>>
>>
>>
>>
>>
>> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <aj...@ravenpack.com>
>> wrote:
>>
>>> Hi Zoran,
>>>
>>> Thank you for such detailed response. This looks very promising. i will
>>> need to get my head around the aggregator pattern.
>>> For this week I will be busy with other tasks, but I will get back to it
>>> as soon as I can to see if I can get Camel work for the use case.
>>>
>>> Cheerio
>>> Artur
>>>
>>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> wrote:
>>>
>>>> Hi Artur,
>>>> I was thinking that the order of the messages would be important as
>>>> you need to process them sequentially.
>>>>
>>>> So I think you could use the dynamic message routing[1] with
>>>> aggregator[2], something like:
>>>>
>>>>     from("aws-sqs:...")
>>>>         .process("#preProcess")
>>>>         .toD("direct:${header.nextRoute}");
>>>>
>>>>     from("direct:parallel")...;
>>>>     from("direct:sequential").aggregate(simple("${header.group}"
>>>> )).completion..;
>>>>
>>>> So from yout SQS queue you would use a processor to pre-process
>>>> message whose responsibility would be to set the (custom) `nextRoute`
>>>> and (custom) `group` headers. `nextRoute` would be `parallel` or
>>>> `sequential`, and if `sequential` the messages would be aggregated
>>>> using the `group` header.
>>>>
>>>> You would want to define your own custom aggregation strategy or use
>>>> the completion* options that are available to you. There also might be
>>>> need to use seda[3] to fine tune any parallel processing. You might
>>>> throw in there a data format unmarshaller[4] instead of the
>>>> `preProcess` processor and use something like `${body.xyz} == foo` in
>>>> the `toD` expression.
>>>>
>>>> And I would guess that you need to examine transactions or persistence
>>>> at some point also in case your aggregation step runs for a long time
>>>> or if your use case is sensitive to message loss if interrupted --
>>>> which would undoubtedly lead you back to using queues to separate
>>>> those two ways of processing,
>>>>
>>>> HTH,
>>>>
>>>> zoran
>>>>
>>>> [1] https://camel.apache.org/message-endpoint.html
>>>> [2] https://camel.apache.org/aggregator2.html
>>>> [3] https://camel.apache.org/seda.html
>>>> [4] https://camel.apache.org/data-format.html
>>>>
>>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
>>>> <aj...@ravenpack.com> wrote:
>>>> > Hey Zoran.
>>>> >
>>>> > I read again the patterns you mentioned. In my use case the order of
>>>> > processing within a group doesn't matter as long as two messages from
>>>> the
>>>> > same group are never processed in parallel. So i guess resenquencer is
>>>> out
>>>> > of the picture unless I didn't get the intention.
>>>> >
>>>> > So what we are left with is the content based router. Sure. The message
>>>> > comes, i can see what group it belongs two... And what next? Perhaps
>>>> it's
>>>> > the very first message from that group so I would need to trigger
>>>> creating
>>>> > route/processor for that group somehow, perhaps messages from this
>>>> group
>>>> > were processed before in which case the processor for the group should
>>>> > already exist...
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:
>>>> >
>>>> >> Hi Artur,
>>>> >> have a look at Camel EIP page[1], what you describe sounds to me like
>>>> >> Resequencer and Content based router patterns,
>>>> >>
>>>> >> zoran
>>>> >>
>>>> >> [1] https://camel.apache.org/eip.html
>>>> >>
>>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>>>> >> <aj...@ravenpack.com> wrote:
>>>> >> > Hello.
>>>> >> >
>>>> >> > I wonder if someone could push me in the right direction trying to
>>>> >> express
>>>> >> > quite curious case in Camel route.
>>>> >> >
>>>> >> > Imagine there's a stream of messages some of which can be processed
>>>> in
>>>> >> > parallel and some have to be processed serially. You can group the
>>>> >> messages
>>>> >> > that require serial processing together by looking at the message
>>>> body.
>>>> >> You
>>>> >> > don't know upfront how many groups can occur in the stream.
>>>> >> >
>>>> >> > The way I thought about doing this is having a route for each
>>>> message
>>>> >> > group. Since I don't know upfront how many and what groups there
>>>> will be
>>>> >> > then I would need to create routes dynamically. If a message comes
>>>> >> > belonging to a group that doesn't have it's handling route, then i
>>>> could
>>>> >> > create it (is that even possible??) Then if there's no messages
>>>> coming
>>>> >> for
>>>> >> > a given group in some time I could remove the route for the group to
>>>> >> > cleanup (is that possible?)
>>>> >> >
>>>> >> > New to Camel
>>>> >> >
>>>> >> > Thx!
>>>> >> > Artur
>>>> >>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Zoran Regvart
>>>> >>
>>>>
>>>>
>>>>
>>>> --
>>>> Zoran Regvart
>>>>
>>>
>>>
>>



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Re: Curious routing case

Posted by Artur Jablonski <aj...@ravenpack.com>.
I guess one thing that come to my mind is to hide all this parallel stuff
inside a processor, that would just spit out on the other end the result of
processing all those messages. It would handle grouping and serializing and
stuff I guess that would reduce the complexity of the route with a cost of
complexity of the processor. I have no better ideas anyway, so I will give
it a go

On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <aj...@ravenpack.com>
wrote:

> Hello,
>
> I don't think this route definition is fitting my use case, though I
> learnt a thing or two about the interesting patterns linked. Thanks!
>
> Ok, so let me try to clarify the use case.
>
>
> 1. The stream is infinite, it's not a batch job. The messages keep on
> coming from SQS 'all the time'
>
> 2. More important thing is about parallel processing.
>
> Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.
>
> Let's say this is the order in which the messages happen to appear
>  in the route from SQS
>
> A1, A2, B1, C1, B2, A3, C2, B3
>
> Now what I am trying to achieve is grouping the messages that have to be
> processed sequentially (order doesn't matter as long as no two messages
> from the same group are processed at the same time).
> So I am trying to somehow get these streams
>
> A1, A2, A3
>
> B1, B2, B3
>
> C1, C2
>
>
> So, A1 B1 and C1 can be processed in parallel because they are from
> different groups, but the messages within groups need to be processed one
> by one.
>
> In my example, there are 3 groups, but there can be many and I don't know
> what they are in advance. The processing logic between the groups is
> similar and is a function of the group so I can get a processor for group A
> from a method call getProcessor(A), B getProcessor(B), etc.
>
> I am stuck at how to do that in Camel, because since I don't know the
> groups in advance, I would need to create processing routes dynamically.
>
> Say the system starts, and A1 arrives, there can't be any processor for
> group A yet, since it's the first message from the group and I need to
> somehow dynamically add processing capability of the group A to the route
> and then perhaps if the messages from group A stop arriving for some time,
> that processor could be removed.
>
> How to add the parallel part between the group messages is also blurry to
> me. One way of doing this I was thinking was to do a multicast to all the
> dynamically created processing routes for groups and stick a filter before
> so that only messages from particular group can go through. From multicast
> page:
>
> from("direct:a").multicast().parallelProcessing().to("direct:x",
> "direct:y", "direct:z");
>
> But here the x,y,z endpoints are hardcoded. I could write up some custom
> multicast I suppose to search the routes in CamelContext...... not sure.
>
> Thanks
> Artur
>
>
>
>
>
> On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <aj...@ravenpack.com>
> wrote:
>
>> Hi Zoran,
>>
>> Thank you for such detailed response. This looks very promising. i will
>> need to get my head around the aggregator pattern.
>> For this week I will be busy with other tasks, but I will get back to it
>> as soon as I can to see if I can get Camel work for the use case.
>>
>> Cheerio
>> Artur
>>
>> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> wrote:
>>
>>> Hi Artur,
>>> I was thinking that the order of the messages would be important as
>>> you need to process them sequentially.
>>>
>>> So I think you could use the dynamic message routing[1] with
>>> aggregator[2], something like:
>>>
>>>     from("aws-sqs:...")
>>>         .process("#preProcess")
>>>         .toD("direct:${header.nextRoute}");
>>>
>>>     from("direct:parallel")...;
>>>     from("direct:sequential").aggregate(simple("${header.group}"
>>> )).completion..;
>>>
>>> So from yout SQS queue you would use a processor to pre-process
>>> message whose responsibility would be to set the (custom) `nextRoute`
>>> and (custom) `group` headers. `nextRoute` would be `parallel` or
>>> `sequential`, and if `sequential` the messages would be aggregated
>>> using the `group` header.
>>>
>>> You would want to define your own custom aggregation strategy or use
>>> the completion* options that are available to you. There also might be
>>> need to use seda[3] to fine tune any parallel processing. You might
>>> throw in there a data format unmarshaller[4] instead of the
>>> `preProcess` processor and use something like `${body.xyz} == foo` in
>>> the `toD` expression.
>>>
>>> And I would guess that you need to examine transactions or persistence
>>> at some point also in case your aggregation step runs for a long time
>>> or if your use case is sensitive to message loss if interrupted --
>>> which would undoubtedly lead you back to using queues to separate
>>> those two ways of processing,
>>>
>>> HTH,
>>>
>>> zoran
>>>
>>> [1] https://camel.apache.org/message-endpoint.html
>>> [2] https://camel.apache.org/aggregator2.html
>>> [3] https://camel.apache.org/seda.html
>>> [4] https://camel.apache.org/data-format.html
>>>
>>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
>>> <aj...@ravenpack.com> wrote:
>>> > Hey Zoran.
>>> >
>>> > I read again the patterns you mentioned. In my use case the order of
>>> > processing within a group doesn't matter as long as two messages from
>>> the
>>> > same group are never processed in parallel. So i guess resenquencer is
>>> out
>>> > of the picture unless I didn't get the intention.
>>> >
>>> > So what we are left with is the content based router. Sure. The message
>>> > comes, i can see what group it belongs two... And what next? Perhaps
>>> it's
>>> > the very first message from that group so I would need to trigger
>>> creating
>>> > route/processor for that group somehow, perhaps messages from this
>>> group
>>> > were processed before in which case the processor for the group should
>>> > already exist...
>>> >
>>> >
>>> >
>>> >
>>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:
>>> >
>>> >> Hi Artur,
>>> >> have a look at Camel EIP page[1], what you describe sounds to me like
>>> >> Resequencer and Content based router patterns,
>>> >>
>>> >> zoran
>>> >>
>>> >> [1] https://camel.apache.org/eip.html
>>> >>
>>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>>> >> <aj...@ravenpack.com> wrote:
>>> >> > Hello.
>>> >> >
>>> >> > I wonder if someone could push me in the right direction trying to
>>> >> express
>>> >> > quite curious case in Camel route.
>>> >> >
>>> >> > Imagine there's a stream of messages some of which can be processed
>>> in
>>> >> > parallel and some have to be processed serially. You can group the
>>> >> messages
>>> >> > that require serial processing together by looking at the message
>>> body.
>>> >> You
>>> >> > don't know upfront how many groups can occur in the stream.
>>> >> >
>>> >> > The way I thought about doing this is having a route for each
>>> message
>>> >> > group. Since I don't know upfront how many and what groups there
>>> will be
>>> >> > then I would need to create routes dynamically. If a message comes
>>> >> > belonging to a group that doesn't have it's handling route, then i
>>> could
>>> >> > create it (is that even possible??) Then if there's no messages
>>> coming
>>> >> for
>>> >> > a given group in some time I could remove the route for the group to
>>> >> > cleanup (is that possible?)
>>> >> >
>>> >> > New to Camel
>>> >> >
>>> >> > Thx!
>>> >> > Artur
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Zoran Regvart
>>> >>
>>>
>>>
>>>
>>> --
>>> Zoran Regvart
>>>
>>
>>
>

Re: Curious routing case

Posted by Artur Jablonski <aj...@ravenpack.com>.
Hello,

I don't think this route definition is fitting my use case, though I learnt
a thing or two about the interesting patterns linked. Thanks!

Ok, so let me try to clarify the use case.


1. The stream is infinite, it's not a batch job. The messages keep on
coming from SQS 'all the time'

2. More important thing is about parallel processing.

Let A1 denote a message 1 from group A, B2 message 2 from group B, etc.

Let's say this is the order in which the messages happen to appear
 in the route from SQS

A1, A2, B1, C1, B2, A3, C2, B3

Now what I am trying to achieve is grouping the messages that have to be
processed sequentially (order doesn't matter as long as no two messages
from the same group are processed at the same time).
So I am trying to somehow get these streams

A1, A2, A3

B1, B2, B3

C1, C2


So, A1 B1 and C1 can be processed in parallel because they are from
different groups, but the messages within groups need to be processed one
by one.

In my example, there are 3 groups, but there can be many and I don't know
what they are in advance. The processing logic between the groups is
similar and is a function of the group so I can get a processor for group A
from a method call getProcessor(A), B getProcessor(B), etc.

I am stuck at how to do that in Camel, because since I don't know the
groups in advance, I would need to create processing routes dynamically.

Say the system starts, and A1 arrives, there can't be any processor for
group A yet, since it's the first message from the group and I need to
somehow dynamically add processing capability of the group A to the route
and then perhaps if the messages from group A stop arriving for some time,
that processor could be removed.

How to add the parallel part between the group messages is also blurry to
me. One way of doing this I was thinking was to do a multicast to all the
dynamically created processing routes for groups and stick a filter before
so that only messages from particular group can go through. From multicast
page:

from("direct:a").multicast().parallelProcessing().to("direct:x",
"direct:y", "direct:z");

But here the x,y,z endpoints are hardcoded. I could write up some custom
multicast I suppose to search the routes in CamelContext...... not sure.

Thanks
Artur





On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <aj...@ravenpack.com>
wrote:

> Hi Zoran,
>
> Thank you for such detailed response. This looks very promising. i will
> need to get my head around the aggregator pattern.
> For this week I will be busy with other tasks, but I will get back to it
> as soon as I can to see if I can get Camel work for the use case.
>
> Cheerio
> Artur
>
> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> wrote:
>
>> Hi Artur,
>> I was thinking that the order of the messages would be important as
>> you need to process them sequentially.
>>
>> So I think you could use the dynamic message routing[1] with
>> aggregator[2], something like:
>>
>>     from("aws-sqs:...")
>>         .process("#preProcess")
>>         .toD("direct:${header.nextRoute}");
>>
>>     from("direct:parallel")...;
>>     from("direct:sequential").aggregate(simple("${header.group}"
>> )).completion..;
>>
>> So from yout SQS queue you would use a processor to pre-process
>> message whose responsibility would be to set the (custom) `nextRoute`
>> and (custom) `group` headers. `nextRoute` would be `parallel` or
>> `sequential`, and if `sequential` the messages would be aggregated
>> using the `group` header.
>>
>> You would want to define your own custom aggregation strategy or use
>> the completion* options that are available to you. There also might be
>> need to use seda[3] to fine tune any parallel processing. You might
>> throw in there a data format unmarshaller[4] instead of the
>> `preProcess` processor and use something like `${body.xyz} == foo` in
>> the `toD` expression.
>>
>> And I would guess that you need to examine transactions or persistence
>> at some point also in case your aggregation step runs for a long time
>> or if your use case is sensitive to message loss if interrupted --
>> which would undoubtedly lead you back to using queues to separate
>> those two ways of processing,
>>
>> HTH,
>>
>> zoran
>>
>> [1] https://camel.apache.org/message-endpoint.html
>> [2] https://camel.apache.org/aggregator2.html
>> [3] https://camel.apache.org/seda.html
>> [4] https://camel.apache.org/data-format.html
>>
>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
>> <aj...@ravenpack.com> wrote:
>> > Hey Zoran.
>> >
>> > I read again the patterns you mentioned. In my use case the order of
>> > processing within a group doesn't matter as long as two messages from
>> the
>> > same group are never processed in parallel. So i guess resenquencer is
>> out
>> > of the picture unless I didn't get the intention.
>> >
>> > So what we are left with is the content based router. Sure. The message
>> > comes, i can see what group it belongs two... And what next? Perhaps
>> it's
>> > the very first message from that group so I would need to trigger
>> creating
>> > route/processor for that group somehow, perhaps messages from this group
>> > were processed before in which case the processor for the group should
>> > already exist...
>> >
>> >
>> >
>> >
>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:
>> >
>> >> Hi Artur,
>> >> have a look at Camel EIP page[1], what you describe sounds to me like
>> >> Resequencer and Content based router patterns,
>> >>
>> >> zoran
>> >>
>> >> [1] https://camel.apache.org/eip.html
>> >>
>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>> >> <aj...@ravenpack.com> wrote:
>> >> > Hello.
>> >> >
>> >> > I wonder if someone could push me in the right direction trying to
>> >> express
>> >> > quite curious case in Camel route.
>> >> >
>> >> > Imagine there's a stream of messages some of which can be processed
>> in
>> >> > parallel and some have to be processed serially. You can group the
>> >> messages
>> >> > that require serial processing together by looking at the message
>> body.
>> >> You
>> >> > don't know upfront how many groups can occur in the stream.
>> >> >
>> >> > The way I thought about doing this is having a route for each message
>> >> > group. Since I don't know upfront how many and what groups there
>> will be
>> >> > then I would need to create routes dynamically. If a message comes
>> >> > belonging to a group that doesn't have it's handling route, then i
>> could
>> >> > create it (is that even possible??) Then if there's no messages
>> coming
>> >> for
>> >> > a given group in some time I could remove the route for the group to
>> >> > cleanup (is that possible?)
>> >> >
>> >> > New to Camel
>> >> >
>> >> > Thx!
>> >> > Artur
>> >>
>> >>
>> >>
>> >> --
>> >> Zoran Regvart
>> >>
>>
>>
>>
>> --
>> Zoran Regvart
>>
>
>

Re: Curious routing case

Posted by Artur Jablonski <aj...@ravenpack.com>.
Hi Zoran,

Thank you for such detailed response. This looks very promising. i will
need to get my head around the aggregator pattern.
For this week I will be busy with other tasks, but I will get back to it as
soon as I can to see if I can get Camel work for the use case.

Cheerio
Artur

On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> wrote:

> Hi Artur,
> I was thinking that the order of the messages would be important as
> you need to process them sequentially.
>
> So I think you could use the dynamic message routing[1] with
> aggregator[2], something like:
>
>     from("aws-sqs:...")
>         .process("#preProcess")
>         .toD("direct:${header.nextRoute}");
>
>     from("direct:parallel")...;
>     from("direct:sequential").aggregate(simple("${header.
> group}")).completion..;
>
> So from yout SQS queue you would use a processor to pre-process
> message whose responsibility would be to set the (custom) `nextRoute`
> and (custom) `group` headers. `nextRoute` would be `parallel` or
> `sequential`, and if `sequential` the messages would be aggregated
> using the `group` header.
>
> You would want to define your own custom aggregation strategy or use
> the completion* options that are available to you. There also might be
> need to use seda[3] to fine tune any parallel processing. You might
> throw in there a data format unmarshaller[4] instead of the
> `preProcess` processor and use something like `${body.xyz} == foo` in
> the `toD` expression.
>
> And I would guess that you need to examine transactions or persistence
> at some point also in case your aggregation step runs for a long time
> or if your use case is sensitive to message loss if interrupted --
> which would undoubtedly lead you back to using queues to separate
> those two ways of processing,
>
> HTH,
>
> zoran
>
> [1] https://camel.apache.org/message-endpoint.html
> [2] https://camel.apache.org/aggregator2.html
> [3] https://camel.apache.org/seda.html
> [4] https://camel.apache.org/data-format.html
>
> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
> <aj...@ravenpack.com> wrote:
> > Hey Zoran.
> >
> > I read again the patterns you mentioned. In my use case the order of
> > processing within a group doesn't matter as long as two messages from the
> > same group are never processed in parallel. So i guess resenquencer is
> out
> > of the picture unless I didn't get the intention.
> >
> > So what we are left with is the content based router. Sure. The message
> > comes, i can see what group it belongs two... And what next? Perhaps it's
> > the very first message from that group so I would need to trigger
> creating
> > route/processor for that group somehow, perhaps messages from this group
> > were processed before in which case the processor for the group should
> > already exist...
> >
> >
> >
> >
> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:
> >
> >> Hi Artur,
> >> have a look at Camel EIP page[1], what you describe sounds to me like
> >> Resequencer and Content based router patterns,
> >>
> >> zoran
> >>
> >> [1] https://camel.apache.org/eip.html
> >>
> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
> >> <aj...@ravenpack.com> wrote:
> >> > Hello.
> >> >
> >> > I wonder if someone could push me in the right direction trying to
> >> express
> >> > quite curious case in Camel route.
> >> >
> >> > Imagine there's a stream of messages some of which can be processed in
> >> > parallel and some have to be processed serially. You can group the
> >> messages
> >> > that require serial processing together by looking at the message
> body.
> >> You
> >> > don't know upfront how many groups can occur in the stream.
> >> >
> >> > The way I thought about doing this is having a route for each message
> >> > group. Since I don't know upfront how many and what groups there will
> be
> >> > then I would need to create routes dynamically. If a message comes
> >> > belonging to a group that doesn't have it's handling route, then i
> could
> >> > create it (is that even possible??) Then if there's no messages coming
> >> for
> >> > a given group in some time I could remove the route for the group to
> >> > cleanup (is that possible?)
> >> >
> >> > New to Camel
> >> >
> >> > Thx!
> >> > Artur
> >>
> >>
> >>
> >> --
> >> Zoran Regvart
> >>
>
>
>
> --
> Zoran Regvart
>

Re: Curious routing case

Posted by Zoran Regvart <zo...@regvart.com>.
Hi Artur,
I was thinking that the order of the messages would be important as
you need to process them sequentially.

So I think you could use the dynamic message routing[1] with
aggregator[2], something like:

    from("aws-sqs:...")
        .process("#preProcess")
        .toD("direct:${header.nextRoute}");

    from("direct:parallel")...;
    from("direct:sequential").aggregate(simple("${header.group}")).completion..;

So from yout SQS queue you would use a processor to pre-process
message whose responsibility would be to set the (custom) `nextRoute`
and (custom) `group` headers. `nextRoute` would be `parallel` or
`sequential`, and if `sequential` the messages would be aggregated
using the `group` header.

You would want to define your own custom aggregation strategy or use
the completion* options that are available to you. There also might be
need to use seda[3] to fine tune any parallel processing. You might
throw in there a data format unmarshaller[4] instead of the
`preProcess` processor and use something like `${body.xyz} == foo` in
the `toD` expression.

And I would guess that you need to examine transactions or persistence
at some point also in case your aggregation step runs for a long time
or if your use case is sensitive to message loss if interrupted --
which would undoubtedly lead you back to using queues to separate
those two ways of processing,

HTH,

zoran

[1] https://camel.apache.org/message-endpoint.html
[2] https://camel.apache.org/aggregator2.html
[3] https://camel.apache.org/seda.html
[4] https://camel.apache.org/data-format.html

On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski
<aj...@ravenpack.com> wrote:
> Hey Zoran.
>
> I read again the patterns you mentioned. In my use case the order of
> processing within a group doesn't matter as long as two messages from the
> same group are never processed in parallel. So i guess resenquencer is out
> of the picture unless I didn't get the intention.
>
> So what we are left with is the content based router. Sure. The message
> comes, i can see what group it belongs two... And what next? Perhaps it's
> the very first message from that group so I would need to trigger creating
> route/processor for that group somehow, perhaps messages from this group
> were processed before in which case the processor for the group should
> already exist...
>
>
>
>
> On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:
>
>> Hi Artur,
>> have a look at Camel EIP page[1], what you describe sounds to me like
>> Resequencer and Content based router patterns,
>>
>> zoran
>>
>> [1] https://camel.apache.org/eip.html
>>
>> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
>> <aj...@ravenpack.com> wrote:
>> > Hello.
>> >
>> > I wonder if someone could push me in the right direction trying to
>> express
>> > quite curious case in Camel route.
>> >
>> > Imagine there's a stream of messages some of which can be processed in
>> > parallel and some have to be processed serially. You can group the
>> messages
>> > that require serial processing together by looking at the message body.
>> You
>> > don't know upfront how many groups can occur in the stream.
>> >
>> > The way I thought about doing this is having a route for each message
>> > group. Since I don't know upfront how many and what groups there will be
>> > then I would need to create routes dynamically. If a message comes
>> > belonging to a group that doesn't have it's handling route, then i could
>> > create it (is that even possible??) Then if there's no messages coming
>> for
>> > a given group in some time I could remove the route for the group to
>> > cleanup (is that possible?)
>> >
>> > New to Camel
>> >
>> > Thx!
>> > Artur
>>
>>
>>
>> --
>> Zoran Regvart
>>



-- 
Zoran Regvart

Re: Curious routing case

Posted by Artur Jablonski <aj...@ravenpack.com>.
Hey Zoran.

I read again the patterns you mentioned. In my use case the order of
processing within a group doesn't matter as long as two messages from the
same group are never processed in parallel. So i guess resenquencer is out
of the picture unless I didn't get the intention.

So what we are left with is the content based router. Sure. The message
comes, i can see what group it belongs two... And what next? Perhaps it's
the very first message from that group so I would need to trigger creating
route/processor for that group somehow, perhaps messages from this group
were processed before in which case the processor for the group should
already exist...




On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote:

> Hi Artur,
> have a look at Camel EIP page[1], what you describe sounds to me like
> Resequencer and Content based router patterns,
>
> zoran
>
> [1] https://camel.apache.org/eip.html
>
> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
> <aj...@ravenpack.com> wrote:
> > Hello.
> >
> > I wonder if someone could push me in the right direction trying to
> express
> > quite curious case in Camel route.
> >
> > Imagine there's a stream of messages some of which can be processed in
> > parallel and some have to be processed serially. You can group the
> messages
> > that require serial processing together by looking at the message body.
> You
> > don't know upfront how many groups can occur in the stream.
> >
> > The way I thought about doing this is having a route for each message
> > group. Since I don't know upfront how many and what groups there will be
> > then I would need to create routes dynamically. If a message comes
> > belonging to a group that doesn't have it's handling route, then i could
> > create it (is that even possible??) Then if there's no messages coming
> for
> > a given group in some time I could remove the route for the group to
> > cleanup (is that possible?)
> >
> > New to Camel
> >
> > Thx!
> > Artur
>
>
>
> --
> Zoran Regvart
>

Re: Curious routing case

Posted by Zoran Regvart <zo...@regvart.com>.
Hi Artur,
have a look at Camel EIP page[1], what you describe sounds to me like
Resequencer and Content based router patterns,

zoran

[1] https://camel.apache.org/eip.html

On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski
<aj...@ravenpack.com> wrote:
> Hello.
>
> I wonder if someone could push me in the right direction trying to express
> quite curious case in Camel route.
>
> Imagine there's a stream of messages some of which can be processed in
> parallel and some have to be processed serially. You can group the messages
> that require serial processing together by looking at the message body. You
> don't know upfront how many groups can occur in the stream.
>
> The way I thought about doing this is having a route for each message
> group. Since I don't know upfront how many and what groups there will be
> then I would need to create routes dynamically. If a message comes
> belonging to a group that doesn't have it's handling route, then i could
> create it (is that even possible??) Then if there's no messages coming for
> a given group in some time I could remove the route for the group to
> cleanup (is that possible?)
>
> New to Camel
>
> Thx!
> Artur



-- 
Zoran Regvart

Re: Curious routing case

Posted by Artur Jablonski <aj...@ravenpack.com>.
Hmmm. I am getting messages from Amazon sqs and can't change it. Let's say
I want to see if I can do it in Camel without putting another messaging
system in between.

Interesting feature of amq though.
Thx!

On 31 Mar 2017 5:15 p.m., "Quinn Stevenson" <qu...@pronoia-solutions.com>
wrote:

I’d probably use ActiveMQ Message Groups for this
http://activemq.apache.org/message-groups.html <http://activemq.apache.org/
message-groups.html>

> On Mar 31, 2017, at 9:08 AM, Artur Jablonski <aj...@ravenpack.com>
wrote:
>
> Hello.
>
> I wonder if someone could push me in the right direction trying to express
> quite curious case in Camel route.
>
> Imagine there's a stream of messages some of which can be processed in
> parallel and some have to be processed serially. You can group the
messages
> that require serial processing together by looking at the message body.
You
> don't know upfront how many groups can occur in the stream.
>
> The way I thought about doing this is having a route for each message
> group. Since I don't know upfront how many and what groups there will be
> then I would need to create routes dynamically. If a message comes
> belonging to a group that doesn't have it's handling route, then i could
> create it (is that even possible??) Then if there's no messages coming for
> a given group in some time I could remove the route for the group to
> cleanup (is that possible?)
>
> New to Camel
>
> Thx!
> Artur

Re: Curious routing case

Posted by Quinn Stevenson <qu...@pronoia-solutions.com>.
I’d probably use ActiveMQ Message Groups for this http://activemq.apache.org/message-groups.html <http://activemq.apache.org/message-groups.html>

> On Mar 31, 2017, at 9:08 AM, Artur Jablonski <aj...@ravenpack.com> wrote:
> 
> Hello.
> 
> I wonder if someone could push me in the right direction trying to express
> quite curious case in Camel route.
> 
> Imagine there's a stream of messages some of which can be processed in
> parallel and some have to be processed serially. You can group the messages
> that require serial processing together by looking at the message body. You
> don't know upfront how many groups can occur in the stream.
> 
> The way I thought about doing this is having a route for each message
> group. Since I don't know upfront how many and what groups there will be
> then I would need to create routes dynamically. If a message comes
> belonging to a group that doesn't have it's handling route, then i could
> create it (is that even possible??) Then if there's no messages coming for
> a given group in some time I could remove the route for the group to
> cleanup (is that possible?)
> 
> New to Camel
> 
> Thx!
> Artur