You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2018/08/16 13:33:00 UTC

[jira] [Reopened] (CAMEL-12732) Kafka manual commit to file repository doesn't work properly (using Spring boot)

     [ https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen reopened CAMEL-12732:
---------------------------------

> Kafka manual commit to file repository doesn't work properly (using Spring boot)
> --------------------------------------------------------------------------------
>
>                 Key: CAMEL-12732
>                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.22.0
>         Environment: Spring boot
> kafka_2.11-1.1.0
>            Reporter: michael elbaz
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 2.22.1, 2.23.0
>
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time
>  
> {code:java}
> @Component
> public class Route extends RouteBuilder {
>     @Override
>     public void configure() throws Exception {
>         from(kafka())
>                 .to("log:TEST?level=INFO")
>                 .process(Route::commitKafka);
>     }
>     private String kafka() {
>         String kafkaEndpoint = "kafka:";
>         kafkaEndpoint += "topictest";
>         kafkaEndpoint += "?brokers=";
>         kafkaEndpoint += "localhost:9092";
>         kafkaEndpoint += "&groupId=";
>         kafkaEndpoint += "TEST";
>         kafkaEndpoint += "&autoOffsetReset=";
>         kafkaEndpoint += "earliest";
>         kafkaEndpoint += "&autoCommitEnable=";
>         kafkaEndpoint += false;
>         kafkaEndpoint += "&allowManualCommit=";
>         kafkaEndpoint += true;
>         kafkaEndpoint += "&offsetRepository=";
>         kafkaEndpoint += "#fileStore";
>         return kafkaEndpoint;
>     }
>     @Bean(name = "fileStore")
>     private FileStateRepository fileStateRepository() {
>         FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
>         // This will be empty
>         // System.out.println(fileStateRepository.getCache());
>         return fileStateRepository;
>     }
>     private static void commitKafka(Exchange exchange) {
>         KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
>         manual.commitSync();
>     }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)