You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Craig Washington <Cr...@aexp.com.INVALID> on 2016/11/03 22:32:40 UTC

Split, aggregate, then wait for completionTimeout before continuing

Hello,
I have a simple use case where I'd like to do the following:
* split a message and process each part
* aggregate parts into groups of size N for group-processing (w/timeout to ensure no parts are lost)
* continue route ONLY after all aggregated parts have completed

The simplified route and output are as follows:
---

public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport {
    @Test
    public void test() throws Exception {
        template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
        Thread.sleep(3000);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                    .split(body().tokenize(","), new UseLatestAggregationStrategy())
                    .streaming()
                        //process each msg part
                        .log("processing the msg: ${body}")
                        //group by size
                        .aggregate(constant(true), new GroupedMessageAggregationStrategy())
                        .completionSize(2).completionTimeout(2000)
                            //save grouped messages in batches
                            .log("saving msg group ${body}")
                            .to("mock:result")
                        //end the aggregate processing
                        .end()
                    //end the split processing
                    .end()
                    .log("*** COMPLETED split+aggregate processing");

                    //...do some more stuff here ONLY after all parts are complete...
            }
        };
    }
}

---
//output
15:28:52.072 INFO  route1 - processing the msg: AAA
15:28:52.074 INFO  route1 - processing the msg: BBB
15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
15:28:52.076 INFO  route1 - processing the msg: CCC
15:28:52.077 INFO  route1 - processing the msg: DDD
15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
15:28:52.078 INFO  route1 - processing the msg: EEE
15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
---

Ideally, the "COMPLETED" line should print last (after the final aggregated group from timeout).
Seems simple enough though I haven't found a way to get this working. Neither of the completionTimeout examples I've found in source nor CIA focus on the route timing after the split.

(I'm actually trying to process a large file with streaming and parallelProcessing so completionFromBatchConsumer() wouldn't work, though I think this part is irrelevant)

Using Camel 2.17.3

Thanks



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.

******************************************************************************

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Craig Washington <Cr...@aexp.com.INVALID>.
@Claus
I could be wrong but the composed message processor (splitter-only) doesn’t appear to solve it. 
I’m not actually interested in the final composed response. I’m only interested that the message was split, processed in batches. 

My actual route will split a huge file (w/streaming, parallel processing), process each part, and save all parts to mongodb (in batches), before finally invoking other routes. 
The splitter-only approach would essentially aggregate the entire file in memory (won’t work).
My original sample route appears to use the splitter+aggregator approach however as stated the timing doesn’t work after the split/end. 

On 11/4/16, 2:37 AM, "Claus Ibsen" <cl...@gmail.com> wrote:

    See the composed message processor EIP with the splitter only
    http://camel.apache.org/composed-message-processor.html
    
    
    On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
    <Cr...@aexp.com.invalid> wrote:
    > Hello,
    > I have a simple use case where I'd like to do the following:
    > * split a message and process each part
    > * aggregate parts into groups of size N for group-processing (w/timeout to ensure no parts are lost)
    > * continue route ONLY after all aggregated parts have completed
    >
    > The simplified route and output are as follows:
    > ---
    >
    > public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport {
    >     @Test
    >     public void test() throws Exception {
    >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
    >         Thread.sleep(3000);
    >     }
    >
    >     @Override
    >     protected RoutesBuilder createRouteBuilder() throws Exception {
    >         return new RouteBuilder() {
    >             @Override
    >             public void configure() throws Exception {
    >                 from("direct:start")
    >                     .split(body().tokenize(","), new UseLatestAggregationStrategy())
    >                     .streaming()
    >                         //process each msg part
    >                         .log("processing the msg: ${body}")
    >                         //group by size
    >                         .aggregate(constant(true), new GroupedMessageAggregationStrategy())
    >                         .completionSize(2).completionTimeout(2000)
    >                             //save grouped messages in batches
    >                             .log("saving msg group ${body}")
    >                             .to("mock:result")
    >                         //end the aggregate processing
    >                         .end()
    >                     //end the split processing
    >                     .end()
    >                     .log("*** COMPLETED split+aggregate processing");
    >
    >                     //...do some more stuff here ONLY after all parts are complete...
    >             }
    >         };
    >     }
    > }
    >
    > ---
    > //output
    > 15:28:52.072 INFO  route1 - processing the msg: AAA
    > 15:28:52.074 INFO  route1 - processing the msg: BBB
    > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > 15:28:52.076 INFO  route1 - processing the msg: CCC
    > 15:28:52.077 INFO  route1 - processing the msg: DDD
    > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > 15:28:52.078 INFO  route1 - processing the msg: EEE
    > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
    > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
    > ---
    >
    > Ideally, the "COMPLETED" line should print last (after the final aggregated group from timeout).
    > Seems simple enough though I haven't found a way to get this working. Neither of the completionTimeout examples I've found in source nor CIA focus on the route timing after the split.
    >
    > (I'm actually trying to process a large file with streaming and parallelProcessing so completionFromBatchConsumer() wouldn't work, though I think this part is irrelevant)
    >
    > Using Camel 2.17.3
    >
    > Thanks
    >
    >
    >
    > American Express made the following annotations
    > ******************************************************************************
    > "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."
    >
    > American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des
    > renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
    >
    > ******************************************************************************
    
    
    
    -- 
    Claus Ibsen
    -----------------
    http://davsclaus.com @davsclaus
    Camel in Action 2: https://www.manning.com/ibsen2
    



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.

******************************************************************************

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Brad Johnson <br...@mediadriver.com>.
When I run into this kind of edge case problem I   usually start thinking
about Java objects. If you have an object with an increment method you
invoke when coming in and then have a List inside that you add your
aggregated objects into, then when the list length and incremented size are
the same, your bean can iterate over the list and fire them onto the next
route with a producer template. So you can be sure that all aggregation is
done.

I'm on my cell phone or could do a better job explaining

On Nov 4, 2016 4:16 PM, "Craig Washington"
<Cr...@aexp.com.invalid> wrote:

> That’s correct
>
> On 11/4/16, 4:13 PM, "Brad Johnson" <br...@mediadriver.com> wrote:
>
>     It appears that the splitter is exiting while your aggregator is still
>     stuck waiting for the time out if I'm groking the code right.
>
>     On Nov 4, 2016 3:49 PM, "Craig Washington"
>     <Cr...@aexp.com.invalid> wrote:
>
>     > Thanks Brad.
>     > Just now taking a look at the resequencer. In my case the final
> message
>     > and ordering
>     > aren’t important, only the timing of the split/group/completion.
>     >
>     > The resequncer may work, likewise I suppose I could get away with
> using
>     > delay() and
>     > some sufficiently large value after the split, there just seems to
> be a
>     > better way that
>     > I’m missing.
>     >
>     > On 11/4/16, 8:54 AM, "Brad Johnson" <br...@mediadriver.com>
> wrote:
>     >
>     >     @Craig,
>     >
>     >     This may or may not be appropriate to your situation depending
> on your
>     >     situation, but I recently ran into a situation where I was
> parallel
>     >     processing and making external REST calls to another company so
> one
>     >     couldn't count on uniform transaction speed and an exception
> absolutely
>     >     bogged a thread down. One example of the speed differential was
> the
>     > footer
>     >     that came in the input file had to be written out last and it
> required
>     > no
>     >     processing so it zipped through.
>     >
>     >     What I ended up doing is right after my splitter I added a
> header with
>     > an
>     >     incremented value so I'd know what order they came in. Then I'd
> drop it
>     >     into a SEDA queue with multiple threads for consumers. Right
> before I
>     > would
>     >     write it to the output file, I put in a resequencer with a time
> out.
>     > That
>     >     time out value was of sufficient length to ensure that values on
> the
>     >     resequence queue would get shuffled to correct order  before
> getting
>     > output.
>     >
>     >     The only item I wished were different about the resequencer is a
> reset
>     > on
>     >     the timeout of an item if another item came in with a lower
> number.
>     > If the
>     >     footer was number 20 and 4 seconds later number 18 came in, I'd
> prefer
>     >     number 20's timeout to be reset to help ensure that differences
> in
>     > order be
>     >     naturally taken care of.
>     >
>     >     Anyway, your aggregation strategy could assign an order number
> and
>     > before
>     >     writing it out you drop it on the resequencer queue so if
> another one
>     > with
>     >     a lower number comes in after it they get output in their natural
>     > order.
>     >     Someday if I get some time I'd like to modify the resequencer to
> have
>     > that
>     >     behavior, if not by default, then at least settable via a flag.
> I
>     > can't
>     >     really think of a case where I wouldn't want the timer reset
> when a
>     >     resequence on order of messages occurred.  After all, the
> purpose is to
>     >     ensure the ordering is correct independent of the timing.
>     >
>     >     Like I said, that may or may not help in your situation. I don't
> know
>     > if
>     >     I've used the composed message EIP that Claus mentions so can't
> really
>     >     comment.
>     >
>     >     On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <
> claus.ibsen@gmail.com>
>     > wrote:
>     >
>     >     > See the composed message processor EIP with the splitter only
>     >     > http://camel.apache.org/composed-message-processor.html
>     >     >
>     >     >
>     >     > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
>     >     > <Cr...@aexp.com.invalid> wrote:
>     >     > > Hello,
>     >     > > I have a simple use case where I'd like to do the following:
>     >     > > * split a message and process each part
>     >     > > * aggregate parts into groups of size N for group-processing
>     > (w/timeout
>     >     > to ensure no parts are lost)
>     >     > > * continue route ONLY after all aggregated parts have
> completed
>     >     > >
>     >     > > The simplified route and output are as follows:
>     >     > > ---
>     >     > >
>     >     > > public class CamelSplitAggregateWaitForCompletion extends
>     >     > CamelTestSupport {
>     >     > >     @Test
>     >     > >     public void test() throws Exception {
>     >     > >         template.sendBody("direct:start",
> "AAA,BBB,CCC,DDD,EEE");
>     >     > >         Thread.sleep(3000);
>     >     > >     }
>     >     > >
>     >     > >     @Override
>     >     > >     protected RoutesBuilder createRouteBuilder() throws
> Exception {
>     >     > >         return new RouteBuilder() {
>     >     > >             @Override
>     >     > >             public void configure() throws Exception {
>     >     > >                 from("direct:start")
>     >     > >                     .split(body().tokenize(","), new
>     >     > UseLatestAggregationStrategy())
>     >     > >                     .streaming()
>     >     > >                         //process each msg part
>     >     > >                         .log("processing the msg: ${body}")
>     >     > >                         //group by size
>     >     > >                         .aggregate(constant(true), new
>     >     > GroupedMessageAggregationStrategy())
>     >     > >                         .completionSize(2).
> completionTimeout(2000)
>     >     > >                             //save grouped messages in
> batches
>     >     > >                             .log("saving msg group ${body}")
>     >     > >                             .to("mock:result")
>     >     > >                         //end the aggregate processing
>     >     > >                         .end()
>     >     > >                     //end the split processing
>     >     > >                     .end()
>     >     > >                     .log("*** COMPLETED split+aggregate
>     > processing");
>     >     > >
>     >     > >                     //...do some more stuff here ONLY after
> all
>     > parts
>     >     > are complete...
>     >     > >             }
>     >     > >         };
>     >     > >     }
>     >     > > }
>     >     > >
>     >     > > ---
>     >     > > //output
>     >     > > 15:28:52.072 INFO  route1 - processing the msg: AAA
>     >     > > 15:28:52.074 INFO  route1 - processing the msg: BBB
>     >     > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2
>     > elements)
>     >     > > 15:28:52.076 INFO  route1 - processing the msg: CCC
>     >     > > 15:28:52.077 INFO  route1 - processing the msg: DDD
>     >     > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2
>     > elements)
>     >     > > 15:28:52.078 INFO  route1 - processing the msg: EEE
>     >     > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate
>     > processing
>     >     > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1
>     > elements)
>     >     > > ---
>     >     > >
>     >     > > Ideally, the "COMPLETED" line should print last (after the
> final
>     >     > aggregated group from timeout).
>     >     > > Seems simple enough though I haven't found a way to get this
>     > working.
>     >     > Neither of the completionTimeout examples I've found in source
> nor
>     > CIA
>     >     > focus on the route timing after the split.
>     >     > >
>     >     > > (I'm actually trying to process a large file with streaming
> and
>     >     > parallelProcessing so completionFromBatchConsumer() wouldn't
> work,
>     > though I
>     >     > think this part is irrelevant)
>     >     > >
>     >     > > Using Camel 2.17.3
>     >     > >
>     >     > > Thanks
>     >     > >
>     >     > >
>     >     > >
>     >     > > American Express made the following annotations
>     >     > > ************************************************************
>     >     > ******************
>     >     > > "This message and any attachments are solely for the intended
>     > recipient
>     >     > and may contain confidential or privileged information. If you
> are
>     > not the
>     >     > intended recipient, any disclosure, copying, use, or
> distribution of
>     > the
>     >     > information included in this message and any attachments is
>     > prohibited. If
>     >     > you have received this communication in error, please notify
> us by
>     > reply
>     >     > e-mail and immediately and permanently delete this message and
> any
>     >     > attachments. Thank you."
>     >     > >
>     >     > > American Express a ajouté le commentaire suivant le Ce
> courrier et
>     > toute
>     >     > pièce jointe qu'il contient sont réservés au seul destinataire
>     > indiqué et
>     >     > peuvent renfermer des
>     >     > > renseignements confidentiels et privilégiés. Si vous n'êtes
> pas le
>     >     > destinataire prévu, toute divulgation, duplication,
> utilisation ou
>     >     > distribution du courrier ou de toute pièce jointe est
> interdite. Si
>     > vous
>     >     > avez reçu cette communication par erreur, veuillez nous en
> aviser par
>     >     > courrier et détruire immédiatement le courrier et les pièces
>     > jointes. Merci.
>     >     > >
>     >     > > ************************************************************
>     >     > ******************
>     >     >
>     >     >
>     >     >
>     >     > --
>     >     > Claus Ibsen
>     >     > -----------------
>     >     > http://davsclaus.com @davsclaus
>     >     > Camel in Action 2: https://www.manning.com/ibsen2
>     >     >
>     >
>     >
>     >
>     >
>     > American Express made the following annotations
>     > ************************************************************
>     > ******************
>     > "This message and any attachments are solely for the intended
> recipient
>     > and may contain confidential or privileged information. If you are
> not the
>     > intended recipient, any disclosure, copying, use, or distribution of
> the
>     > information included in this message and any attachments is
> prohibited. If
>     > you have received this communication in error, please notify us by
> reply
>     > e-mail and immediately and permanently delete this message and any
>     > attachments. Thank you."
>     >
>     > American Express a ajouté le commentaire suivant le Ce courrier et
> toute
>     > pièce jointe qu'il contient sont réservés au seul destinataire
> indiqué et
>     > peuvent renfermer des
>     > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
>     > destinataire prévu, toute divulgation, duplication, utilisation ou
>     > distribution du courrier ou de toute pièce jointe est interdite. Si
> vous
>     > avez reçu cette communication par erreur, veuillez nous en aviser par
>     > courrier et détruire immédiatement le courrier et les pièces
> jointes. Merci.
>     >
>     > ************************************************************
>     > ******************
>     >
>
>
>
>
> American Express made the following annotations
> ************************************************************
> ******************
> "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information included in this message and any attachments is prohibited. If
> you have received this communication in error, please notify us by reply
> e-mail and immediately and permanently delete this message and any
> attachments. Thank you."
>
> American Express a ajouté le commentaire suivant le Ce courrier et toute
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
> peuvent renfermer des
> renseignements confidentiels et privilégiés. Si vous n'êtes pas le
> destinataire prévu, toute divulgation, duplication, utilisation ou
> distribution du courrier ou de toute pièce jointe est interdite. Si vous
> avez reçu cette communication par erreur, veuillez nous en aviser par
> courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
>
> ************************************************************
> ******************
>

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Craig Washington <Cr...@aexp.com.INVALID>.
That’s correct

On 11/4/16, 4:13 PM, "Brad Johnson" <br...@mediadriver.com> wrote:

    It appears that the splitter is exiting while your aggregator is still
    stuck waiting for the time out if I'm groking the code right.
    
    On Nov 4, 2016 3:49 PM, "Craig Washington"
    <Cr...@aexp.com.invalid> wrote:
    
    > Thanks Brad.
    > Just now taking a look at the resequencer. In my case the final message
    > and ordering
    > aren’t important, only the timing of the split/group/completion.
    >
    > The resequncer may work, likewise I suppose I could get away with using
    > delay() and
    > some sufficiently large value after the split, there just seems to be a
    > better way that
    > I’m missing.
    >
    > On 11/4/16, 8:54 AM, "Brad Johnson" <br...@mediadriver.com> wrote:
    >
    >     @Craig,
    >
    >     This may or may not be appropriate to your situation depending on your
    >     situation, but I recently ran into a situation where I was parallel
    >     processing and making external REST calls to another company so one
    >     couldn't count on uniform transaction speed and an exception absolutely
    >     bogged a thread down. One example of the speed differential was the
    > footer
    >     that came in the input file had to be written out last and it required
    > no
    >     processing so it zipped through.
    >
    >     What I ended up doing is right after my splitter I added a header with
    > an
    >     incremented value so I'd know what order they came in. Then I'd drop it
    >     into a SEDA queue with multiple threads for consumers. Right before I
    > would
    >     write it to the output file, I put in a resequencer with a time out.
    > That
    >     time out value was of sufficient length to ensure that values on the
    >     resequence queue would get shuffled to correct order  before getting
    > output.
    >
    >     The only item I wished were different about the resequencer is a reset
    > on
    >     the timeout of an item if another item came in with a lower number.
    > If the
    >     footer was number 20 and 4 seconds later number 18 came in, I'd prefer
    >     number 20's timeout to be reset to help ensure that differences in
    > order be
    >     naturally taken care of.
    >
    >     Anyway, your aggregation strategy could assign an order number and
    > before
    >     writing it out you drop it on the resequencer queue so if another one
    > with
    >     a lower number comes in after it they get output in their natural
    > order.
    >     Someday if I get some time I'd like to modify the resequencer to have
    > that
    >     behavior, if not by default, then at least settable via a flag.  I
    > can't
    >     really think of a case where I wouldn't want the timer reset when a
    >     resequence on order of messages occurred.  After all, the purpose is to
    >     ensure the ordering is correct independent of the timing.
    >
    >     Like I said, that may or may not help in your situation. I don't know
    > if
    >     I've used the composed message EIP that Claus mentions so can't really
    >     comment.
    >
    >     On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <cl...@gmail.com>
    > wrote:
    >
    >     > See the composed message processor EIP with the splitter only
    >     > http://camel.apache.org/composed-message-processor.html
    >     >
    >     >
    >     > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
    >     > <Cr...@aexp.com.invalid> wrote:
    >     > > Hello,
    >     > > I have a simple use case where I'd like to do the following:
    >     > > * split a message and process each part
    >     > > * aggregate parts into groups of size N for group-processing
    > (w/timeout
    >     > to ensure no parts are lost)
    >     > > * continue route ONLY after all aggregated parts have completed
    >     > >
    >     > > The simplified route and output are as follows:
    >     > > ---
    >     > >
    >     > > public class CamelSplitAggregateWaitForCompletion extends
    >     > CamelTestSupport {
    >     > >     @Test
    >     > >     public void test() throws Exception {
    >     > >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
    >     > >         Thread.sleep(3000);
    >     > >     }
    >     > >
    >     > >     @Override
    >     > >     protected RoutesBuilder createRouteBuilder() throws Exception {
    >     > >         return new RouteBuilder() {
    >     > >             @Override
    >     > >             public void configure() throws Exception {
    >     > >                 from("direct:start")
    >     > >                     .split(body().tokenize(","), new
    >     > UseLatestAggregationStrategy())
    >     > >                     .streaming()
    >     > >                         //process each msg part
    >     > >                         .log("processing the msg: ${body}")
    >     > >                         //group by size
    >     > >                         .aggregate(constant(true), new
    >     > GroupedMessageAggregationStrategy())
    >     > >                         .completionSize(2).completionTimeout(2000)
    >     > >                             //save grouped messages in batches
    >     > >                             .log("saving msg group ${body}")
    >     > >                             .to("mock:result")
    >     > >                         //end the aggregate processing
    >     > >                         .end()
    >     > >                     //end the split processing
    >     > >                     .end()
    >     > >                     .log("*** COMPLETED split+aggregate
    > processing");
    >     > >
    >     > >                     //...do some more stuff here ONLY after all
    > parts
    >     > are complete...
    >     > >             }
    >     > >         };
    >     > >     }
    >     > > }
    >     > >
    >     > > ---
    >     > > //output
    >     > > 15:28:52.072 INFO  route1 - processing the msg: AAA
    >     > > 15:28:52.074 INFO  route1 - processing the msg: BBB
    >     > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2
    > elements)
    >     > > 15:28:52.076 INFO  route1 - processing the msg: CCC
    >     > > 15:28:52.077 INFO  route1 - processing the msg: DDD
    >     > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2
    > elements)
    >     > > 15:28:52.078 INFO  route1 - processing the msg: EEE
    >     > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate
    > processing
    >     > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1
    > elements)
    >     > > ---
    >     > >
    >     > > Ideally, the "COMPLETED" line should print last (after the final
    >     > aggregated group from timeout).
    >     > > Seems simple enough though I haven't found a way to get this
    > working.
    >     > Neither of the completionTimeout examples I've found in source nor
    > CIA
    >     > focus on the route timing after the split.
    >     > >
    >     > > (I'm actually trying to process a large file with streaming and
    >     > parallelProcessing so completionFromBatchConsumer() wouldn't work,
    > though I
    >     > think this part is irrelevant)
    >     > >
    >     > > Using Camel 2.17.3
    >     > >
    >     > > Thanks
    >     > >
    >     > >
    >     > >
    >     > > American Express made the following annotations
    >     > > ************************************************************
    >     > ******************
    >     > > "This message and any attachments are solely for the intended
    > recipient
    >     > and may contain confidential or privileged information. If you are
    > not the
    >     > intended recipient, any disclosure, copying, use, or distribution of
    > the
    >     > information included in this message and any attachments is
    > prohibited. If
    >     > you have received this communication in error, please notify us by
    > reply
    >     > e-mail and immediately and permanently delete this message and any
    >     > attachments. Thank you."
    >     > >
    >     > > American Express a ajouté le commentaire suivant le Ce courrier et
    > toute
    >     > pièce jointe qu'il contient sont réservés au seul destinataire
    > indiqué et
    >     > peuvent renfermer des
    >     > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
    >     > destinataire prévu, toute divulgation, duplication, utilisation ou
    >     > distribution du courrier ou de toute pièce jointe est interdite. Si
    > vous
    >     > avez reçu cette communication par erreur, veuillez nous en aviser par
    >     > courrier et détruire immédiatement le courrier et les pièces
    > jointes. Merci.
    >     > >
    >     > > ************************************************************
    >     > ******************
    >     >
    >     >
    >     >
    >     > --
    >     > Claus Ibsen
    >     > -----------------
    >     > http://davsclaus.com @davsclaus
    >     > Camel in Action 2: https://www.manning.com/ibsen2
    >     >
    >
    >
    >
    >
    > American Express made the following annotations
    > ************************************************************
    > ******************
    > "This message and any attachments are solely for the intended recipient
    > and may contain confidential or privileged information. If you are not the
    > intended recipient, any disclosure, copying, use, or distribution of the
    > information included in this message and any attachments is prohibited. If
    > you have received this communication in error, please notify us by reply
    > e-mail and immediately and permanently delete this message and any
    > attachments. Thank you."
    >
    > American Express a ajouté le commentaire suivant le Ce courrier et toute
    > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
    > peuvent renfermer des
    > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
    > destinataire prévu, toute divulgation, duplication, utilisation ou
    > distribution du courrier ou de toute pièce jointe est interdite. Si vous
    > avez reçu cette communication par erreur, veuillez nous en aviser par
    > courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
    >
    > ************************************************************
    > ******************
    >
    



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.

******************************************************************************

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Brad Johnson <br...@mediadriver.com>.
It appears that the splitter is exiting while your aggregator is still
stuck waiting for the time out if I'm groking the code right.

On Nov 4, 2016 3:49 PM, "Craig Washington"
<Cr...@aexp.com.invalid> wrote:

> Thanks Brad.
> Just now taking a look at the resequencer. In my case the final message
> and ordering
> aren’t important, only the timing of the split/group/completion.
>
> The resequncer may work, likewise I suppose I could get away with using
> delay() and
> some sufficiently large value after the split, there just seems to be a
> better way that
> I’m missing.
>
> On 11/4/16, 8:54 AM, "Brad Johnson" <br...@mediadriver.com> wrote:
>
>     @Craig,
>
>     This may or may not be appropriate to your situation depending on your
>     situation, but I recently ran into a situation where I was parallel
>     processing and making external REST calls to another company so one
>     couldn't count on uniform transaction speed and an exception absolutely
>     bogged a thread down. One example of the speed differential was the
> footer
>     that came in the input file had to be written out last and it required
> no
>     processing so it zipped through.
>
>     What I ended up doing is right after my splitter I added a header with
> an
>     incremented value so I'd know what order they came in. Then I'd drop it
>     into a SEDA queue with multiple threads for consumers. Right before I
> would
>     write it to the output file, I put in a resequencer with a time out.
> That
>     time out value was of sufficient length to ensure that values on the
>     resequence queue would get shuffled to correct order  before getting
> output.
>
>     The only item I wished were different about the resequencer is a reset
> on
>     the timeout of an item if another item came in with a lower number.
> If the
>     footer was number 20 and 4 seconds later number 18 came in, I'd prefer
>     number 20's timeout to be reset to help ensure that differences in
> order be
>     naturally taken care of.
>
>     Anyway, your aggregation strategy could assign an order number and
> before
>     writing it out you drop it on the resequencer queue so if another one
> with
>     a lower number comes in after it they get output in their natural
> order.
>     Someday if I get some time I'd like to modify the resequencer to have
> that
>     behavior, if not by default, then at least settable via a flag.  I
> can't
>     really think of a case where I wouldn't want the timer reset when a
>     resequence on order of messages occurred.  After all, the purpose is to
>     ensure the ordering is correct independent of the timing.
>
>     Like I said, that may or may not help in your situation. I don't know
> if
>     I've used the composed message EIP that Claus mentions so can't really
>     comment.
>
>     On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <cl...@gmail.com>
> wrote:
>
>     > See the composed message processor EIP with the splitter only
>     > http://camel.apache.org/composed-message-processor.html
>     >
>     >
>     > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
>     > <Cr...@aexp.com.invalid> wrote:
>     > > Hello,
>     > > I have a simple use case where I'd like to do the following:
>     > > * split a message and process each part
>     > > * aggregate parts into groups of size N for group-processing
> (w/timeout
>     > to ensure no parts are lost)
>     > > * continue route ONLY after all aggregated parts have completed
>     > >
>     > > The simplified route and output are as follows:
>     > > ---
>     > >
>     > > public class CamelSplitAggregateWaitForCompletion extends
>     > CamelTestSupport {
>     > >     @Test
>     > >     public void test() throws Exception {
>     > >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
>     > >         Thread.sleep(3000);
>     > >     }
>     > >
>     > >     @Override
>     > >     protected RoutesBuilder createRouteBuilder() throws Exception {
>     > >         return new RouteBuilder() {
>     > >             @Override
>     > >             public void configure() throws Exception {
>     > >                 from("direct:start")
>     > >                     .split(body().tokenize(","), new
>     > UseLatestAggregationStrategy())
>     > >                     .streaming()
>     > >                         //process each msg part
>     > >                         .log("processing the msg: ${body}")
>     > >                         //group by size
>     > >                         .aggregate(constant(true), new
>     > GroupedMessageAggregationStrategy())
>     > >                         .completionSize(2).completionTimeout(2000)
>     > >                             //save grouped messages in batches
>     > >                             .log("saving msg group ${body}")
>     > >                             .to("mock:result")
>     > >                         //end the aggregate processing
>     > >                         .end()
>     > >                     //end the split processing
>     > >                     .end()
>     > >                     .log("*** COMPLETED split+aggregate
> processing");
>     > >
>     > >                     //...do some more stuff here ONLY after all
> parts
>     > are complete...
>     > >             }
>     > >         };
>     > >     }
>     > > }
>     > >
>     > > ---
>     > > //output
>     > > 15:28:52.072 INFO  route1 - processing the msg: AAA
>     > > 15:28:52.074 INFO  route1 - processing the msg: BBB
>     > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2
> elements)
>     > > 15:28:52.076 INFO  route1 - processing the msg: CCC
>     > > 15:28:52.077 INFO  route1 - processing the msg: DDD
>     > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2
> elements)
>     > > 15:28:52.078 INFO  route1 - processing the msg: EEE
>     > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate
> processing
>     > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1
> elements)
>     > > ---
>     > >
>     > > Ideally, the "COMPLETED" line should print last (after the final
>     > aggregated group from timeout).
>     > > Seems simple enough though I haven't found a way to get this
> working.
>     > Neither of the completionTimeout examples I've found in source nor
> CIA
>     > focus on the route timing after the split.
>     > >
>     > > (I'm actually trying to process a large file with streaming and
>     > parallelProcessing so completionFromBatchConsumer() wouldn't work,
> though I
>     > think this part is irrelevant)
>     > >
>     > > Using Camel 2.17.3
>     > >
>     > > Thanks
>     > >
>     > >
>     > >
>     > > American Express made the following annotations
>     > > ************************************************************
>     > ******************
>     > > "This message and any attachments are solely for the intended
> recipient
>     > and may contain confidential or privileged information. If you are
> not the
>     > intended recipient, any disclosure, copying, use, or distribution of
> the
>     > information included in this message and any attachments is
> prohibited. If
>     > you have received this communication in error, please notify us by
> reply
>     > e-mail and immediately and permanently delete this message and any
>     > attachments. Thank you."
>     > >
>     > > American Express a ajouté le commentaire suivant le Ce courrier et
> toute
>     > pièce jointe qu'il contient sont réservés au seul destinataire
> indiqué et
>     > peuvent renfermer des
>     > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
>     > destinataire prévu, toute divulgation, duplication, utilisation ou
>     > distribution du courrier ou de toute pièce jointe est interdite. Si
> vous
>     > avez reçu cette communication par erreur, veuillez nous en aviser par
>     > courrier et détruire immédiatement le courrier et les pièces
> jointes. Merci.
>     > >
>     > > ************************************************************
>     > ******************
>     >
>     >
>     >
>     > --
>     > Claus Ibsen
>     > -----------------
>     > http://davsclaus.com @davsclaus
>     > Camel in Action 2: https://www.manning.com/ibsen2
>     >
>
>
>
>
> American Express made the following annotations
> ************************************************************
> ******************
> "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information included in this message and any attachments is prohibited. If
> you have received this communication in error, please notify us by reply
> e-mail and immediately and permanently delete this message and any
> attachments. Thank you."
>
> American Express a ajouté le commentaire suivant le Ce courrier et toute
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
> peuvent renfermer des
> renseignements confidentiels et privilégiés. Si vous n'êtes pas le
> destinataire prévu, toute divulgation, duplication, utilisation ou
> distribution du courrier ou de toute pièce jointe est interdite. Si vous
> avez reçu cette communication par erreur, veuillez nous en aviser par
> courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
>
> ************************************************************
> ******************
>

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Craig Washington <Cr...@aexp.com.INVALID>.
Thanks Brad.
Just now taking a look at the resequencer. In my case the final message and ordering 
aren’t important, only the timing of the split/group/completion. 

The resequncer may work, likewise I suppose I could get away with using delay() and 
some sufficiently large value after the split, there just seems to be a better way that 
I’m missing.

On 11/4/16, 8:54 AM, "Brad Johnson" <br...@mediadriver.com> wrote:

    @Craig,
    
    This may or may not be appropriate to your situation depending on your
    situation, but I recently ran into a situation where I was parallel
    processing and making external REST calls to another company so one
    couldn't count on uniform transaction speed and an exception absolutely
    bogged a thread down. One example of the speed differential was the footer
    that came in the input file had to be written out last and it required no
    processing so it zipped through.
    
    What I ended up doing is right after my splitter I added a header with an
    incremented value so I'd know what order they came in. Then I'd drop it
    into a SEDA queue with multiple threads for consumers. Right before I would
    write it to the output file, I put in a resequencer with a time out.  That
    time out value was of sufficient length to ensure that values on the
    resequence queue would get shuffled to correct order  before getting output.
    
    The only item I wished were different about the resequencer is a reset on
    the timeout of an item if another item came in with a lower number.  If the
    footer was number 20 and 4 seconds later number 18 came in, I'd prefer
    number 20's timeout to be reset to help ensure that differences in order be
    naturally taken care of.
    
    Anyway, your aggregation strategy could assign an order number and before
    writing it out you drop it on the resequencer queue so if another one with
    a lower number comes in after it they get output in their natural order.
    Someday if I get some time I'd like to modify the resequencer to have that
    behavior, if not by default, then at least settable via a flag.  I can't
    really think of a case where I wouldn't want the timer reset when a
    resequence on order of messages occurred.  After all, the purpose is to
    ensure the ordering is correct independent of the timing.
    
    Like I said, that may or may not help in your situation. I don't know if
    I've used the composed message EIP that Claus mentions so can't really
    comment.
    
    On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <cl...@gmail.com> wrote:
    
    > See the composed message processor EIP with the splitter only
    > http://camel.apache.org/composed-message-processor.html
    >
    >
    > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
    > <Cr...@aexp.com.invalid> wrote:
    > > Hello,
    > > I have a simple use case where I'd like to do the following:
    > > * split a message and process each part
    > > * aggregate parts into groups of size N for group-processing (w/timeout
    > to ensure no parts are lost)
    > > * continue route ONLY after all aggregated parts have completed
    > >
    > > The simplified route and output are as follows:
    > > ---
    > >
    > > public class CamelSplitAggregateWaitForCompletion extends
    > CamelTestSupport {
    > >     @Test
    > >     public void test() throws Exception {
    > >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
    > >         Thread.sleep(3000);
    > >     }
    > >
    > >     @Override
    > >     protected RoutesBuilder createRouteBuilder() throws Exception {
    > >         return new RouteBuilder() {
    > >             @Override
    > >             public void configure() throws Exception {
    > >                 from("direct:start")
    > >                     .split(body().tokenize(","), new
    > UseLatestAggregationStrategy())
    > >                     .streaming()
    > >                         //process each msg part
    > >                         .log("processing the msg: ${body}")
    > >                         //group by size
    > >                         .aggregate(constant(true), new
    > GroupedMessageAggregationStrategy())
    > >                         .completionSize(2).completionTimeout(2000)
    > >                             //save grouped messages in batches
    > >                             .log("saving msg group ${body}")
    > >                             .to("mock:result")
    > >                         //end the aggregate processing
    > >                         .end()
    > >                     //end the split processing
    > >                     .end()
    > >                     .log("*** COMPLETED split+aggregate processing");
    > >
    > >                     //...do some more stuff here ONLY after all parts
    > are complete...
    > >             }
    > >         };
    > >     }
    > > }
    > >
    > > ---
    > > //output
    > > 15:28:52.072 INFO  route1 - processing the msg: AAA
    > > 15:28:52.074 INFO  route1 - processing the msg: BBB
    > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > > 15:28:52.076 INFO  route1 - processing the msg: CCC
    > > 15:28:52.077 INFO  route1 - processing the msg: DDD
    > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
    > > 15:28:52.078 INFO  route1 - processing the msg: EEE
    > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
    > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
    > > ---
    > >
    > > Ideally, the "COMPLETED" line should print last (after the final
    > aggregated group from timeout).
    > > Seems simple enough though I haven't found a way to get this working.
    > Neither of the completionTimeout examples I've found in source nor CIA
    > focus on the route timing after the split.
    > >
    > > (I'm actually trying to process a large file with streaming and
    > parallelProcessing so completionFromBatchConsumer() wouldn't work, though I
    > think this part is irrelevant)
    > >
    > > Using Camel 2.17.3
    > >
    > > Thanks
    > >
    > >
    > >
    > > American Express made the following annotations
    > > ************************************************************
    > ******************
    > > "This message and any attachments are solely for the intended recipient
    > and may contain confidential or privileged information. If you are not the
    > intended recipient, any disclosure, copying, use, or distribution of the
    > information included in this message and any attachments is prohibited. If
    > you have received this communication in error, please notify us by reply
    > e-mail and immediately and permanently delete this message and any
    > attachments. Thank you."
    > >
    > > American Express a ajouté le commentaire suivant le Ce courrier et toute
    > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
    > peuvent renfermer des
    > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
    > destinataire prévu, toute divulgation, duplication, utilisation ou
    > distribution du courrier ou de toute pièce jointe est interdite. Si vous
    > avez reçu cette communication par erreur, veuillez nous en aviser par
    > courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
    > >
    > > ************************************************************
    > ******************
    >
    >
    >
    > --
    > Claus Ibsen
    > -----------------
    > http://davsclaus.com @davsclaus
    > Camel in Action 2: https://www.manning.com/ibsen2
    >
    



American Express made the following annotations
******************************************************************************
"This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."

American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des 
renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.

******************************************************************************

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Brad Johnson <br...@mediadriver.com>.
@Craig,

This may or may not be appropriate to your situation depending on your
situation, but I recently ran into a situation where I was parallel
processing and making external REST calls to another company so one
couldn't count on uniform transaction speed and an exception absolutely
bogged a thread down. One example of the speed differential was the footer
that came in the input file had to be written out last and it required no
processing so it zipped through.

What I ended up doing is right after my splitter I added a header with an
incremented value so I'd know what order they came in. Then I'd drop it
into a SEDA queue with multiple threads for consumers. Right before I would
write it to the output file, I put in a resequencer with a time out.  That
time out value was of sufficient length to ensure that values on the
resequence queue would get shuffled to correct order  before getting output.

The only item I wished were different about the resequencer is a reset on
the timeout of an item if another item came in with a lower number.  If the
footer was number 20 and 4 seconds later number 18 came in, I'd prefer
number 20's timeout to be reset to help ensure that differences in order be
naturally taken care of.

Anyway, your aggregation strategy could assign an order number and before
writing it out you drop it on the resequencer queue so if another one with
a lower number comes in after it they get output in their natural order.
Someday if I get some time I'd like to modify the resequencer to have that
behavior, if not by default, then at least settable via a flag.  I can't
really think of a case where I wouldn't want the timer reset when a
resequence on order of messages occurred.  After all, the purpose is to
ensure the ordering is correct independent of the timing.

Like I said, that may or may not help in your situation. I don't know if
I've used the composed message EIP that Claus mentions so can't really
comment.

On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <cl...@gmail.com> wrote:

> See the composed message processor EIP with the splitter only
> http://camel.apache.org/composed-message-processor.html
>
>
> On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
> <Cr...@aexp.com.invalid> wrote:
> > Hello,
> > I have a simple use case where I'd like to do the following:
> > * split a message and process each part
> > * aggregate parts into groups of size N for group-processing (w/timeout
> to ensure no parts are lost)
> > * continue route ONLY after all aggregated parts have completed
> >
> > The simplified route and output are as follows:
> > ---
> >
> > public class CamelSplitAggregateWaitForCompletion extends
> CamelTestSupport {
> >     @Test
> >     public void test() throws Exception {
> >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
> >         Thread.sleep(3000);
> >     }
> >
> >     @Override
> >     protected RoutesBuilder createRouteBuilder() throws Exception {
> >         return new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 from("direct:start")
> >                     .split(body().tokenize(","), new
> UseLatestAggregationStrategy())
> >                     .streaming()
> >                         //process each msg part
> >                         .log("processing the msg: ${body}")
> >                         //group by size
> >                         .aggregate(constant(true), new
> GroupedMessageAggregationStrategy())
> >                         .completionSize(2).completionTimeout(2000)
> >                             //save grouped messages in batches
> >                             .log("saving msg group ${body}")
> >                             .to("mock:result")
> >                         //end the aggregate processing
> >                         .end()
> >                     //end the split processing
> >                     .end()
> >                     .log("*** COMPLETED split+aggregate processing");
> >
> >                     //...do some more stuff here ONLY after all parts
> are complete...
> >             }
> >         };
> >     }
> > }
> >
> > ---
> > //output
> > 15:28:52.072 INFO  route1 - processing the msg: AAA
> > 15:28:52.074 INFO  route1 - processing the msg: BBB
> > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
> > 15:28:52.076 INFO  route1 - processing the msg: CCC
> > 15:28:52.077 INFO  route1 - processing the msg: DDD
> > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
> > 15:28:52.078 INFO  route1 - processing the msg: EEE
> > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
> > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
> > ---
> >
> > Ideally, the "COMPLETED" line should print last (after the final
> aggregated group from timeout).
> > Seems simple enough though I haven't found a way to get this working.
> Neither of the completionTimeout examples I've found in source nor CIA
> focus on the route timing after the split.
> >
> > (I'm actually trying to process a large file with streaming and
> parallelProcessing so completionFromBatchConsumer() wouldn't work, though I
> think this part is irrelevant)
> >
> > Using Camel 2.17.3
> >
> > Thanks
> >
> >
> >
> > American Express made the following annotations
> > ************************************************************
> ******************
> > "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information included in this message and any attachments is prohibited. If
> you have received this communication in error, please notify us by reply
> e-mail and immediately and permanently delete this message and any
> attachments. Thank you."
> >
> > American Express a ajouté le commentaire suivant le Ce courrier et toute
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
> peuvent renfermer des
> > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
> destinataire prévu, toute divulgation, duplication, utilisation ou
> distribution du courrier ou de toute pièce jointe est interdite. Si vous
> avez reçu cette communication par erreur, veuillez nous en aviser par
> courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
> >
> > ************************************************************
> ******************
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Re: Split, aggregate, then wait for completionTimeout before continuing

Posted by Claus Ibsen <cl...@gmail.com>.
See the composed message processor EIP with the splitter only
http://camel.apache.org/composed-message-processor.html


On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
<Cr...@aexp.com.invalid> wrote:
> Hello,
> I have a simple use case where I'd like to do the following:
> * split a message and process each part
> * aggregate parts into groups of size N for group-processing (w/timeout to ensure no parts are lost)
> * continue route ONLY after all aggregated parts have completed
>
> The simplified route and output are as follows:
> ---
>
> public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport {
>     @Test
>     public void test() throws Exception {
>         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
>         Thread.sleep(3000);
>     }
>
>     @Override
>     protected RoutesBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("direct:start")
>                     .split(body().tokenize(","), new UseLatestAggregationStrategy())
>                     .streaming()
>                         //process each msg part
>                         .log("processing the msg: ${body}")
>                         //group by size
>                         .aggregate(constant(true), new GroupedMessageAggregationStrategy())
>                         .completionSize(2).completionTimeout(2000)
>                             //save grouped messages in batches
>                             .log("saving msg group ${body}")
>                             .to("mock:result")
>                         //end the aggregate processing
>                         .end()
>                     //end the split processing
>                     .end()
>                     .log("*** COMPLETED split+aggregate processing");
>
>                     //...do some more stuff here ONLY after all parts are complete...
>             }
>         };
>     }
> }
>
> ---
> //output
> 15:28:52.072 INFO  route1 - processing the msg: AAA
> 15:28:52.074 INFO  route1 - processing the msg: BBB
> 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
> 15:28:52.076 INFO  route1 - processing the msg: CCC
> 15:28:52.077 INFO  route1 - processing the msg: DDD
> 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
> 15:28:52.078 INFO  route1 - processing the msg: EEE
> 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
> 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
> ---
>
> Ideally, the "COMPLETED" line should print last (after the final aggregated group from timeout).
> Seems simple enough though I haven't found a way to get this working. Neither of the completionTimeout examples I've found in source nor CIA focus on the route timing after the split.
>
> (I'm actually trying to process a large file with streaming and parallelProcessing so completionFromBatchConsumer() wouldn't work, though I think this part is irrelevant)
>
> Using Camel 2.17.3
>
> Thanks
>
>
>
> American Express made the following annotations
> ******************************************************************************
> "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you."
>
> American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des
> renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
>
> ******************************************************************************



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2