You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Sydney Henrard <sh...@smartwavesa.com> on 2022/09/03 12:33:18 UTC

Aggregator stops route consumption

Hello,

In my application I have a route that consumes a JMS queue, then aggregate on a header and finally process. The issue is that the consumption of the queue is stopped until the processing is done. The queue contains 25 millions messages (600K unique messages based on aggregation rule), aggregation is configured to complete on timeout (30 min) with the UseLatestAggregationStrategy. Persistence is handled with LevelDB. Paralllel processing is enabled.

What I see is the consumption of 2-3 millions messages during the 30 min, and then the consumption stops after the aggregator thread starts the processing. The consumption resumes after all exchanges have been processed. In my case the processing takes days, so no consumption of the queue.

I try to reproduce with a simpler unit test. By analysing the log I find the same kind of issue:

13:15:00.852 [main] INFO route1 - Received message 0 - 0

13:15:03.861 [main] INFO route1 - Received message 0 - 174288
13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 4

13:15:04.880 [main] INFO route1 - Received message 0 - 266292
13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 10

13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 - Process message 0 - 16858
13:15:06.159 [main] INFO route1 - Received message 0 - 266293  <----- 1s wait with last message 266292

13:15:07.178 [main] INFO route1 - Received message 0 - 368107

13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process message 0 - 17882

13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 - Process message 0 - 199001
13:15:23.241 [main] INFO route1 - Received message 0 - 368108  <----- 16s wait with last message 368107

Is this something that can be mitigated? Am I missing something in the route?

public class AggregationTest extends CamelTestSupport {

    @Test
    public void testMock() throws Exception {
        for (int duplicate = 0; duplicate < 2; duplicate++) {
            for (int i = 0; i < 400000; i++) {
                template.sendBodyAndHeader("direct:start",
                        String.format("%d - %d", duplicate, i),
                        "JMSXGroupID", String.valueOf(i));
            }
        }
        NotifyBuilder notify = new NotifyBuilder(context)
                .from("mock:result")
                .create();

        boolean done = notify.matches(30, TimeUnit.SECONDS);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() {
                from("direct:start")
                        .log("Received message ${body}")
                        .aggregate(header("JMSXGroupID"), new UseLatestAggregationStrategy())
                        .completionTimeout(3000)
                        .parallelProcessing()
                        .log("Process message ${body}")
                        .to("mock:result");
            }
        };
    }
}

Thanks
Sydney

Re: Aggregator stops route consumption

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

This sounds like a not so good design with using leveldb as a temporary
storage.

Instead I would look at designing so you can aggregate in smaller batches
and send back to the JMS broker, and have 2nd routes consume from that to
continue routing.

What Camel version do you use?



On Sat, Sep 3, 2022 at 5:29 PM Sydney Henrard <sh...@smartwavesa.com>
wrote:

> Hello,
>
> In my application I have a route that consumes a JMS queue, then aggregate
> on a header and finally process. The issue is that the consumption of the
> queue is stopped until the processing is done. The queue contains 25
> millions messages (600K unique messages based on aggregation rule),
> aggregation is configured to complete on timeout (30 min) with the
> UseLatestAggregationStrategy. Persistence is handled with LevelDB.
> Paralllel processing is enabled.
>
> What I see is the consumption of 2-3 millions messages during the 30 min,
> and then the consumption stops after the aggregator thread starts the
> processing. The consumption resumes after all exchanges have been
> processed. In my case the processing takes days, so no consumption of the
> queue.
>
> I try to reproduce with a simpler unit test. By analysing the log I find
> the same kind of issue:
>
> 13:15:00.852 [main] INFO route1 - Received message 0 - 0
>
> 13:15:03.861 [main] INFO route1 - Received message 0 - 174288
> 13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 4
>
> 13:15:04.880 [main] INFO route1 - Received message 0 - 266292
> 13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 10
>
> 13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 -
> Process message 0 - 16858
> 13:15:06.159 [main] INFO route1 - Received message 0 - 266293  <----- 1s
> wait with last message 266292
>
> 13:15:07.178 [main] INFO route1 - Received message 0 - 368107
>
> 13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 17882
>
> 13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 -
> Process message 0 - 199001
> 13:15:23.241 [main] INFO route1 - Received message 0 - 368108  <----- 16s
> wait with last message 368107
>
> Is this something that can be mitigated? Am I missing something in the
> route?
>
> public class AggregationTest extends CamelTestSupport {
>
>     @Test
>     public void testMock() throws Exception {
>         for (int duplicate = 0; duplicate < 2; duplicate++) {
>             for (int i = 0; i < 400000; i++) {
>                 template.sendBodyAndHeader("direct:start",
>                         String.format("%d - %d", duplicate, i),
>                         "JMSXGroupID", String.valueOf(i));
>             }
>         }
>         NotifyBuilder notify = new NotifyBuilder(context)
>                 .from("mock:result")
>                 .create();
>
>         boolean done = notify.matches(30, TimeUnit.SECONDS);
>     }
>
>     @Override
>     protected RoutesBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() {
>                 from("direct:start")
>                         .log("Received message ${body}")
>                         .aggregate(header("JMSXGroupID"), new
> UseLatestAggregationStrategy())
>                         .completionTimeout(3000)
>                         .parallelProcessing()
>                         .log("Process message ${body}")
>                         .to("mock:result");
>             }
>         };
>     }
> }
>
> Thanks
> Sydney
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2