You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/12/06 20:19:44 UTC
[camel] branch main updated: CAMEL-17285: manual commit factories should be registered
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3548018 CAMEL-17285: manual commit factories should be registered
3548018 is described below
commit 354801864032d55cafd5ad6923fd1c56b468f3ef
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Dec 6 19:29:07 2021 +0100
CAMEL-17285: manual commit factories should be registered
---
.../kafka/integration/KafkaConsumerAsyncManualCommitIT.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index 3e0ce83..9cffce8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.kafka.integration;
import java.util.Collections;
import java.util.Properties;
+import org.apache.camel.BindToRegistry;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
import org.apache.camel.Exchange;
@@ -26,8 +27,8 @@ import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.DefaultKafkaManualAsyncCommitFactory;
import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.component.kafka.KafkaManualCommit;
+import org.apache.camel.component.kafka.KafkaManualCommitFactory;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.AfterEach;
@@ -42,7 +43,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
@EndpointInject("kafka:" + TOPIC
+ "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
- + "&allowManualCommit=true&autoOffsetReset=earliest")
+ + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")
private Endpoint from;
@EndpointInject("mock:result")
@@ -51,6 +52,9 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
@EndpointInject("mock:resultBar")
private MockEndpoint toBar;
+ @BindToRegistry("testFactory")
+ private KafkaManualCommitFactory manualCommitFactory = new DefaultKafkaManualAsyncCommitFactory();
+
private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
@BeforeEach
@@ -70,14 +74,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
- ((KafkaEndpoint) from).getComponent().setKafkaManualCommitFactory(new DefaultKafkaManualAsyncCommitFactory());
return new RouteBuilder() {
@Override
public void configure() {
from(from).routeId("foo").to("direct:aggregate");
// With sync manual commit, this would throw a concurrent modification exception
- // It can be usesd in aggregator with completion timeout/interval for instance
+ // It can be used in aggregator with completion timeout/interval for instance
// WARN: records from one partition must be processed by one unique thread
from("direct:aggregate").routeId("aggregate").to(to)
.aggregate()