You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by hakim hejam <ha...@gmail.com> on 2022/10/20 17:02:51 UTC

Apache Camel deadlock on heavy load!

Hello,
We notice under some circumstances of heavy load and request (more the 1000
requests at the same time), threads used by Camel are still in a waiting
state and block the caller thread. This behavior occurs when using
multicast and split, but only when we have configured a timeout on our
routes.
We reproduce it in a simple test as below, we use the 3.16.0 version with
openjdk 11
could you help with that?
Thank you

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
import org.apache.camel.processor.aggregate.StringAggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;

public class CamelTester {

    private static final AtomicLong START_TIME = new AtomicLong();

    public static void main(String[] args) throws Exception {
        CamelContext camelContext = startCamel();
        FluentProducerTemplate fluentProducerTemplate =
DefaultFluentProducerTemplate
                .on(camelContext)
                .withTemplateCustomizer(
                        template -> {
                            template.setMaximumCacheSize(100);
                        }
                );

        IntStream.range(1, 2000).forEach(i -> {
            new Thread(() -> {
                fluentProducerTemplate
                        .withBody("message-" + i)
                        .to("direct:start")
                        .send();
            }).start();
        });

        Thread.sleep(24 * 60 * 60 * 1000);
    }

    private static CamelContext startCamel() throws Exception {
        log("start Camel Context...");
        CamelContext camelContext = new DefaultCamelContext();
        camelContext.addRoutes(createBasicRoute(camelContext));
        camelContext.start();
        return camelContext;
    }

    static RouteBuilder createBasicRoute(CamelContext camelContext) {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")
                        .multicast().parallelProcessing()
                            .aggregationStrategy(new
StringAggregationStrategy())
                            .timeout(10000)
                            .to("direct:a")
                            .to("direct:b")
                            .to("direct:c")
                            .to("direct:def")
                        .end()
                        .to("file:c:/tmp/outbox?charset=iso-8859-1");

                from("direct:def").routeId("def")
                        .split(body().tokenize("message-")).parallelProcessing()
                            .aggregationStrategy(new
StringAggregationStrategy())
                            .timeout(10000)
                            .to("direct:d")
                            .to("direct:e")
                            .to("direct:f")
                        .end();

                from("direct:a").routeId("a")
                        .process(new CustomProcessor("route-A"));
                from("direct:b").routeId("b")
                        .process(new CustomProcessor("route-B"));
                from("direct:c").routeId("c")
                        .process(new CustomProcessor("route-C"));
                from("direct:d").routeId("d")
                        .process(new CustomProcessor("route-D"));
                from("direct:e").routeId("e")
                        .process(new CustomProcessor("route-E"));
                from("direct:f").routeId("f")
                        .process(new CustomProcessor("route-F"));

            }
        };
    }
    public static void log(String msg) {
        long now = System.currentTimeMillis();
        START_TIME.compareAndSet(0, now);
        long elapsed = now - START_TIME.get();
        String name = Thread.currentThread().getName();
        System.out.format("%2$-4s %1$-26s    %3$s\n", name, elapsed, msg);
    }

    static private class CustomProcessor implements Processor {
        String routeId;

        public CustomProcessor(String route) {
            this.routeId = route;
        }

        @Override
        public void process(Exchange exchange) throws Exception {
            log("Start processing for the route " + routeId + " " +
exchange.getMessage().getBody(String.class));
            int waitingTime = ThreadLocalRandom.current().nextInt(1, 2);
            Thread.sleep(waitingTime * 1000);
            String message = exchange.getMessage().getBody(String.class);
            message += "-" + routeId + "-";
            exchange.getMessage().setBody(message, String.class);
            log("End processing for the route " + routeId + " after "
+ waitingTime + "s " + message);
        }
    }

    static private class CostumAggregationStrategy extends
UseLatestAggregationStrategy {
        public void timeout(Exchange oldExchange, int index, int
total, long timeout) {
            System.out.println("timeout...");
        }
    }
}

Re: Apache Camel deadlock on heavy load!

Posted by Zheng Feng <zf...@redhat.com>.
Hi,

Is it possible to check with the latest 3.19.0 release to see if it works?

On Fri, Oct 21, 2022 at 1:35 AM hakim hejam <ha...@gmail.com> wrote:

> Hello,
> We notice under some circumstances of heavy load and request (more the 1000
> requests at the same time), threads used by Camel are still in a waiting
> state and block the caller thread. This behavior occurs when using
> multicast and split, but only when we have configured a timeout on our
> routes.
> We reproduce it in a simple test as below, we use the 3.16.0 version with
> openjdk 11
> could you help with that?
> Thank you
>
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.FluentProducerTemplate;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.impl.engine.DefaultFluentProducerTemplate;
> import org.apache.camel.processor.aggregate.StringAggregationStrategy;
> import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
>
> import java.util.concurrent.ThreadLocalRandom;
> import java.util.concurrent.atomic.AtomicLong;
> import java.util.stream.IntStream;
>
> public class CamelTester {
>
>     private static final AtomicLong START_TIME = new AtomicLong();
>
>     public static void main(String[] args) throws Exception {
>         CamelContext camelContext = startCamel();
>         FluentProducerTemplate fluentProducerTemplate =
> DefaultFluentProducerTemplate
>                 .on(camelContext)
>                 .withTemplateCustomizer(
>                         template -> {
>                             template.setMaximumCacheSize(100);
>                         }
>                 );
>
>         IntStream.range(1, 2000).forEach(i -> {
>             new Thread(() -> {
>                 fluentProducerTemplate
>                         .withBody("message-" + i)
>                         .to("direct:start")
>                         .send();
>             }).start();
>         });
>
>         Thread.sleep(24 * 60 * 60 * 1000);
>     }
>
>     private static CamelContext startCamel() throws Exception {
>         log("start Camel Context...");
>         CamelContext camelContext = new DefaultCamelContext();
>         camelContext.addRoutes(createBasicRoute(camelContext));
>         camelContext.start();
>         return camelContext;
>     }
>
>     static RouteBuilder createBasicRoute(CamelContext camelContext) {
>         return new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("direct:start")
>                         .multicast().parallelProcessing()
>                             .aggregationStrategy(new
> StringAggregationStrategy())
>                             .timeout(10000)
>                             .to("direct:a")
>                             .to("direct:b")
>                             .to("direct:c")
>                             .to("direct:def")
>                         .end()
>                         .to("file:c:/tmp/outbox?charset=iso-8859-1");
>
>                 from("direct:def").routeId("def")
>
> .split(body().tokenize("message-")).parallelProcessing()
>                             .aggregationStrategy(new
> StringAggregationStrategy())
>                             .timeout(10000)
>                             .to("direct:d")
>                             .to("direct:e")
>                             .to("direct:f")
>                         .end();
>
>                 from("direct:a").routeId("a")
>                         .process(new CustomProcessor("route-A"));
>                 from("direct:b").routeId("b")
>                         .process(new CustomProcessor("route-B"));
>                 from("direct:c").routeId("c")
>                         .process(new CustomProcessor("route-C"));
>                 from("direct:d").routeId("d")
>                         .process(new CustomProcessor("route-D"));
>                 from("direct:e").routeId("e")
>                         .process(new CustomProcessor("route-E"));
>                 from("direct:f").routeId("f")
>                         .process(new CustomProcessor("route-F"));
>
>             }
>         };
>     }
>     public static void log(String msg) {
>         long now = System.currentTimeMillis();
>         START_TIME.compareAndSet(0, now);
>         long elapsed = now - START_TIME.get();
>         String name = Thread.currentThread().getName();
>         System.out.format("%2$-4s %1$-26s    %3$s\n", name, elapsed, msg);
>     }
>
>     static private class CustomProcessor implements Processor {
>         String routeId;
>
>         public CustomProcessor(String route) {
>             this.routeId = route;
>         }
>
>         @Override
>         public void process(Exchange exchange) throws Exception {
>             log("Start processing for the route " + routeId + " " +
> exchange.getMessage().getBody(String.class));
>             int waitingTime = ThreadLocalRandom.current().nextInt(1, 2);
>             Thread.sleep(waitingTime * 1000);
>             String message = exchange.getMessage().getBody(String.class);
>             message += "-" + routeId + "-";
>             exchange.getMessage().setBody(message, String.class);
>             log("End processing for the route " + routeId + " after "
> + waitingTime + "s " + message);
>         }
>     }
>
>     static private class CostumAggregationStrategy extends
> UseLatestAggregationStrategy {
>         public void timeout(Exchange oldExchange, int index, int
> total, long timeout) {
>             System.out.println("timeout...");
>         }
>     }
> }
>