You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/14 18:54:55 UTC
[camel-spring-boot] branch main updated: CAMEL-17051 Respond to SeekPolicy changes in camel (#526)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/main by this push:
new 3eba7670f1b CAMEL-17051 Respond to SeekPolicy changes in camel (#526)
3eba7670f1b is described below
commit 3eba7670f1b9ace761baa70b0e9e4e5b4c13a3eb
Author: Tom Cunningham <tc...@redhat.com>
AuthorDate: Thu Apr 14 14:54:50 2022 -0400
CAMEL-17051 Respond to SeekPolicy changes in camel (#526)
---
.../kafka/springboot/KafkaComponentConfiguration.java | 10 +++++-----
.../camel/component/kafka/integration/KafkaConsumerFullIT.java | 5 +++--
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 9d21589633a..1e5b2a5273e 100644
--- a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.PollExceptionStrategy;
import org.apache.camel.component.kafka.PollOnError;
+import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
@@ -278,10 +279,9 @@ public class KafkaComponentConfiguration
private Long pollTimeoutMs = 5000L;
/**
* Set if KafkaConsumer will read from beginning or end on startup:
- * beginning : read from beginning end : read from end This is replacing the
- * earlier property seekToBeginning
+ * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from end.
*/
- private String seekTo;
+ private SeekPolicy seekTo;
/**
* The timeout used to detect failures when using Kafka's group management
* facilities.
@@ -1036,11 +1036,11 @@ public class KafkaComponentConfiguration
this.pollTimeoutMs = pollTimeoutMs;
}
- public String getSeekTo() {
+ public SeekPolicy getSeekTo() {
return seekTo;
}
- public void setSeekTo(String seekTo) {
+ public void setSeekTo(SeekPolicy seekTo) {
this.seekTo = seekTo;
}
diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index ca96589127b..195dd4f524b 100644
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -24,6 +24,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.boot.CamelAutoConfiguration;
@@ -163,7 +164,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
context.getRouteController().stopRoute("full-it");
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(from);
- kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+ kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
context.getRouteController().startRoute("full-it");
@@ -191,7 +192,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
context.getRouteController().stopRoute("full-it");
KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(from);
- kafkaEndpoint.getConfiguration().setSeekTo("end");
+ kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
context.getRouteController().startRoute("full-it");