You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Juan López <ju...@gmail.com> on 2021/01/19 15:36:13 UTC

Unit test router Kafka with ManualCommit

Hello and thanks in advance!

I'm using Apache Camel with Kafka consumer using manual commit.
I can't success with a test because the manual commit is not be able to be
done during the execution of the test. (With automatic commit I haven't any
problem.)

¿Is there an approach to skip this commitSync during the test?

The used versions is (2.24.2) with the dependencies:
camel-spring-boot-starter
camel-kafka
camel-test

ROUTE:

@Override
   public void configure() {
       from("{{camel.routes.kafka.routeName}}")
               .process(
                       exchange -> {
                                //some stuff
                                KafkaManualCommit manual =
              exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT,
              KafkaManualCommit.class);
manual.commitSync();
                        });
   }



TEST:

class RouterNameTest extends CamelTestSupport {

    public void setUp() throws Exception {
        super.setUp();
    }

    @BeforeEach
    void setup() throws Exception {
        context = new DefaultCamelContext();
        //properties component stuff
        context.addRoutes(createRouteBuilder());
        template = context.createProducerTemplate();
        mockEndpoints();
        context.start();
    }


    void mockEndpoints() throws Exception {
        AdviceWithRouteBuilder mockSolr = new AdviceWithRouteBuilder() {
            @Override
            public void configure() {

interceptFrom("{{camel.routes.kafka.routeName}}").to("mock:direct:routeName");
            }
        };
        context.getRouteDefinitions().get(0).adviceWith(context, mockSolr);
    }

    @Override
    protected RouteBuilder createRouteBuilder() {
        return new RouterNameRoute();
    }

    @Test
    void manualKafkaConsumerCommitWithCamel() throws InterruptedException {
        MockEndpoint mockSolr = getMockEndpoint("mock:direct:routeName");

        template.requestBody("{{camel.routes.kafka.routeName}}", "");

        mockSolr.expectedMessageCount(1);
        mockSolr.expectedMessagesMatches(predicate);

        mockSolr.assertIsSatisfied();
    }
}

Thank you very much!