You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Nikumbh <sa...@yahoo.com.INVALID> on 2017/05/08 03:55:33 UTC

Unable to consume messages

Hi all,
I am relatively new to kafka and my initial attempts at consuming messages are failing. My topic has 3 partitions and I am setting "auto.offset.reset" to "earliest". The call to poll hangs. Here's my code: 
***********************************************************************************import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.*;
public class TestConsumer {
    public static void main(String[] args){
        Properties props = new Properties();        props.put("bootstrap.servers", "localhost:9092");        props.put("group.id", "mytest5");        props.put("enable.auto.commit", "false");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest");        props.put("max.poll.records", "50");        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);        consumer.subscribe(Arrays.asList("mpsFleet4"), new RebalanceHandler(consumer));        while (true) {            ConsumerRecords<String, String> records = consumer.poll(120000);            for (ConsumerRecord<String, String> record : records)                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());        }    }}
class RebalanceHandler implements ConsumerRebalanceListener {
    Consumer<String, String> consumer;
    RebalanceHandler(Consumer<String, String> consumer){        this.consumer = consumer;    }
    @Override    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {        System.out.println("Partitions revoked");    }    @Override    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {        System.out.println("Partitions assigned");        //consumer.seekToBeginning(partitions);    }}
***********************************************************************************

When I use ConsumerGroupCommand to check the offsets, I see the following:

***********************************************************************************
GROUP      TOPIC         PARTITION     CURRENT-OFFSET     LOG-END-OFFSET        LAG                      OWNERmytest5     mpsFleet4             0                    unknown                          397184                unknown         consumer-1_/172.28.11.138mytest5     mpsFleet4             1                    unknown                          650207                unknown         consumer-1_/172.28.11.138mytest5     mpsFleet4             2                    unknown                          451783                unknown         consumer-1_/172.28.11.138***********************************************************************************

If I set "enable.auto.commit" to "true", the call to poll still hangs but all the values for CURRENT-OFFSET are same as LOG-END-OFFSET.
I am really not able to understand what's going on and would really appreciate any help. I have even tried playing with explicitly seeking the partition offsets in my rebalance listener.
ThanksSachin