You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "michael elbaz (JIRA)" <ji...@apache.org> on 2018/08/15 10:46:00 UTC

[jira] [Created] (CAMEL-12732) Kafka manual commit to file repository doesn't work

michael elbaz created CAMEL-12732:
-------------------------------------

             Summary: Kafka manual commit to file repository doesn't work
                 Key: CAMEL-12732
                 URL: https://issues.apache.org/jira/browse/CAMEL-12732
             Project: Camel
          Issue Type: Bug
          Components: camel-kafka
    Affects Versions: 2.22.0
         Environment: Spring boot

kafka_2.11-1.1.0
            Reporter: michael elbaz


I'im trying to save the Kafka offset into FileStateRepository, the offset is correctly writing but it is not reading at route start so camel will read all the topic every time

 
{code:java}
@Component
public class Route extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from(kafka())
                .to("log:TEST?level=INFO")
                .process(Route::commitKafka);
    }

    private String kafka() {

        String kafkaEndpoint = "kafka:";

        kafkaEndpoint += "topictest";
        kafkaEndpoint += "?brokers=";
        kafkaEndpoint += "localhost:9092";
        kafkaEndpoint += "&groupId=";
        kafkaEndpoint += "TEST";
        kafkaEndpoint += "&autoOffsetReset=";
        kafkaEndpoint += "earliest";
        kafkaEndpoint += "&autoCommitEnable=";
        kafkaEndpoint += false;
        kafkaEndpoint += "&allowManualCommit=";
        kafkaEndpoint += true;
        kafkaEndpoint += "&offsetRepository=";
        kafkaEndpoint += "#fileStore";

        return kafkaEndpoint;
    }

    @Bean(name = "fileStore")
    private FileStateRepository fileStateRepository() {
        FileStateRepository fileStateRepository = FileStateRepository.fileStateRepository(new File("/kafka/offset_repo/repo.dat"));

        // This will be empty
        // System.out.println(fileStateRepository.getCache());
        return fileStateRepository;
    }

    private static void commitKafka(Exchange exchange) {

        KafkaManualCommit manual = exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
        manual.commitSync();
    }
}

{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)