You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/04/14 18:54:55 UTC

[camel-spring-boot] branch main updated: CAMEL-17051 Respond to SeekPolicy changes in camel (#526)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git


The following commit(s) were added to refs/heads/main by this push:
     new 3eba7670f1b CAMEL-17051 Respond to SeekPolicy changes in camel (#526)
3eba7670f1b is described below

commit 3eba7670f1b9ace761baa70b0e9e4e5b4c13a3eb
Author: Tom Cunningham <tc...@redhat.com>
AuthorDate: Thu Apr 14 14:54:50 2022 -0400

    CAMEL-17051 Respond to SeekPolicy changes in camel (#526)
---
 .../kafka/springboot/KafkaComponentConfiguration.java          | 10 +++++-----
 .../camel/component/kafka/integration/KafkaConsumerFullIT.java |  5 +++--
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
index 9d21589633a..1e5b2a5273e 100644
--- a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
+++ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/KafkaComponentConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.camel.component.kafka.KafkaComponent;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.PollExceptionStrategy;
 import org.apache.camel.component.kafka.PollOnError;
+import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
 import org.apache.camel.component.kafka.serde.KafkaHeaderSerializer;
@@ -278,10 +279,9 @@ public class KafkaComponentConfiguration
     private Long pollTimeoutMs = 5000L;
     /**
      * Set if KafkaConsumer will read from beginning or end on startup:
-     * beginning : read from beginning end : read from end This is replacing the
-     * earlier property seekToBeginning
+     * SeekPolicy.BEGINNING: read from beginning. SeekPolicy.END: read from end.
      */
-    private String seekTo;
+    private SeekPolicy seekTo;
     /**
      * The timeout used to detect failures when using Kafka's group management
      * facilities.
@@ -1036,11 +1036,11 @@ public class KafkaComponentConfiguration
         this.pollTimeoutMs = pollTimeoutMs;
     }
 
-    public String getSeekTo() {
+    public SeekPolicy getSeekTo() {
         return seekTo;
     }
 
-    public void setSeekTo(String seekTo) {
+    public void setSeekTo(SeekPolicy seekTo) {
         this.seekTo = seekTo;
     }
 
diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index ca96589127b..195dd4f524b 100644
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -24,6 +24,7 @@ import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.component.kafka.SeekPolicy;
 import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spring.boot.CamelAutoConfiguration;
@@ -163,7 +164,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
         context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(from);
-        kafkaEndpoint.getConfiguration().setSeekTo("beginning");
+        kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.BEGINNING);
 
         context.getRouteController().startRoute("full-it");
 
@@ -191,7 +192,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
         context.getRouteController().stopRoute("full-it");
 
         KafkaEndpoint kafkaEndpoint = (KafkaEndpoint) context.getEndpoint(from);
-        kafkaEndpoint.getConfiguration().setSeekTo("end");
+        kafkaEndpoint.getConfiguration().setSeekTo(SeekPolicy.END);
 
         context.getRouteController().startRoute("full-it");