You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2018/08/16 13:33:00 UTC
[jira] [Reopened] (CAMEL-12732) Kafka manual commit to file
repository doesn't work properly (using Spring boot)
[ https://issues.apache.org/jira/browse/CAMEL-12732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen reopened CAMEL-12732:
---------------------------------
> Kafka manual commit to file repository doesn't work properly (using Spring boot)
> --------------------------------------------------------------------------------
>
> Key: CAMEL-12732
> URL: https://issues.apache.org/jira/browse/CAMEL-12732
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.22.0
> Environment: Spring boot
> kafka_2.11-1.1.0
> Reporter: michael elbaz
> Assignee: Claus Ibsen
> Priority: Major
> Fix For: 2.22.1, 2.23.0
>
>
> I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time
>
> {code:java}
> @Component
> public class Route extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> from(kafka())
> .to("log:TEST?level=INFO")
> .process(Route::commitKafka);
> }
> private String kafka() {
> String kafkaEndpoint = "kafka:";
> kafkaEndpoint += "topictest";
> kafkaEndpoint += "?brokers=";
> kafkaEndpoint += "localhost:9092";
> kafkaEndpoint += "&groupId=";
> kafkaEndpoint += "TEST";
> kafkaEndpoint += "&autoOffsetReset=";
> kafkaEndpoint += "earliest";
> kafkaEndpoint += "&autoCommitEnable=";
> kafkaEndpoint += false;
> kafkaEndpoint += "&allowManualCommit=";
> kafkaEndpoint += true;
> kafkaEndpoint += "&offsetRepository=";
> kafkaEndpoint += "#fileStore";
> return kafkaEndpoint;
> }
> @Bean(name = "fileStore")
> private FileStateRepository fileStateRepository() {
> FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));
> // This will be empty
> // System.out.println(fileStateRepository.getCache());
> return fileStateRepository;
> }
> private static void commitKafka(Exchange exchange) {
> KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
> manual.commitSync();
> }
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)