You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/12/06 20:19:44 UTC

[camel] branch main updated: CAMEL-17285: manual commit factories should be registered

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3548018  CAMEL-17285: manual commit factories should be registered
3548018 is described below

commit 354801864032d55cafd5ad6923fd1c56b468f3ef
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Dec 6 19:29:07 2021 +0100

    CAMEL-17285: manual commit factories should be registered
---
 .../kafka/integration/KafkaConsumerAsyncManualCommitIT.java   | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 3e0ce83..9cffce8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
 import java.util.Collections;
 import java.util.Properties;
 
+import org.apache.camel.BindToRegistry;
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
@@ -26,8 +27,8 @@ import org.apache.camel.builder.AggregationStrategies;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.DefaultKafkaManualAsyncCommitFactory;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.component.kafka.KafkaManualCommitFactory;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.AfterEach;
@@ -42,7 +43,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
 
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
-                    + "&allowManualCommit=true&autoOffsetReset=earliest")
+                    + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")
     private Endpoint from;
 
     @EndpointInject("mock:result")
@@ -51,6 +52,9 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
     @EndpointInject("mock:resultBar")
     private MockEndpoint toBar;
 
+    @BindToRegistry("testFactory")
+    private KafkaManualCommitFactory manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();
+
     private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
 
     @BeforeEach
@@ -70,14 +74,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
-        ((KafkaEndpoint) from).getComponent().setKafkaManualCommitFactory(new DefaultKafkaManualAsyncCommitFactory());
         return new RouteBuilder() {
 
             @Override
             public void configure() {
                 from(from).routeId("foo").to("direct:aggregate");
                 // With sync manual commit, this would throw a concurrent modification exception
-                // It can be usesd in aggregator with completion timeout/interval for instance
+                // It can be used in aggregator with completion timeout/interval for instance
                 // WARN: records from one partition must be processed by one unique thread
                 from("direct:aggregate").routeId("aggregate").to(to)
                         .aggregate()