You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Suraj Ramesh <su...@gmail.com> on 2017/11/02 18:02:04 UTC

Kafka- Requesting for help

Hi,

I would request to help me with this issue.

I should not commit offset when any exception comes while processing a
message.
I am using below approach to manually comit offset. Can you please help me
in getting uncommited offsets to re-process them in later point of time.


import org.apache.kafka.clients.consumer.ConsumerRecord; import
org.springframework.kafka.annotation.KafkaListener; import
org.springframework.kafka.listener.AcknowledgingMessageListener; import
org.springframework.kafka.support.Acknowledgment; public class
ConsumerKafka implements AcknowledgingMessageListener<String, String>{
@Override @KafkaListener(id = "consumer", topics = {"${kafka.topic}"} )
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment
acknowledgment) { // TODO Auto-generated method stub try{
System.out.println("Read Record is : " + data.value());
System.out.println("Offset is : " + data.offset());
System.out.println("Topic is : " + data.topic());
System.out.println("Partition is : " + data.partition());
acknowledgment.acknowledge(); }catch (Exception e ){
System.out.println("Push the messaged to Error Stream : " + e); } } }

If any exception comes catch block doesnt commit the offset.

Kafka Config.

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContain
erFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerConta
iner;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.AlwaysRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import com.learnbootkafka.consumer.ConsumerKafka;

@Configuration
@EnableKafka
public class KafkaConfig {

 @Autowired
  Environment env;

 /**
  * Consumer Config Starts
  */
 @Bean
 KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,
String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.getContainerProperties().setPollTimeout(3000);
  factory.getContainerProperties().setAckMode(AbstractMessageListenerContain
er.AckMode.MANUAL);
   return factory;
}

 @Bean
public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

 @Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
env.getProperty("kafka.broker"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
env.getProperty("enable.auto.commit"));
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
env.getProperty("auto.commit.interval.ms"));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("group.id"));
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
env.getProperty("kafka.auto.offset.reset"));
return propsMap;

}

 @Bean
public ConsumerKafka listener() {
return new ConsumerKafka();
}


Would be thankful for getting help.

Thank You,
Suraj PR