You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2014/08/01 00:57:44 UTC

Re: Unable to read from the beginning using High level consumer API

Set auto.offset.reset to "smallest"

On Thu, Jul 31, 2014 at 08:25:35PM +0000, Srividhya Shanmugam wrote:
> Kafka Team,
> 
> I am using high level consumer API as shown below to read contents from the topic.
> 
> Properties props = new Properties();
> props.put("zookeeper.connect"                                               ,"localhost:2181");
> props.put" zookeeper.session.timeout.ms"                        ,"10000");
> props.put("zookeeper.sync.time.ms"                                    ,200);
> props.put("auto.commit.interval.ms"                                    ,"1000");
> props.put("consumer.timeout.ms"                                         ,"120000"
> props.put("group.id"                                                                     ,"TEST123");
> ConsumerConfig config = new ConsumerConfig(props);
> 
> 
> ConsumerConnector consumer = kafka.consumer.Consumer
>                                                                 .createJavaConsumerConnector(config);
> 
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put("TEST", new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
> List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("TEST");
> 
> // now launch all the threads
> ThreadPoolExecutor executor  = resource.getExecutor();
> // now create an object to consume the messages
> 
> for (final KafkaStream<byte[], byte[]> stream : streams) {
> TestTask task = new TestTask(stream);
> executor.submit(task);
> }
> And the Testtask is just printing the messages.
> 
> The kafka logger shows the below statement
> 
> Consumer APP51_DFGHSFV1-1406836437053-9ed3b6a7 selected partitions : YYYY:0: fetched offset = -1: consumed offset = -1,YYYY:1: fetched offset = -1: consumed offset = -1
> - [APP51_DFGHSFV1-1406836437053-9ed3b6a7],
> 
> Even when the fetched and consumed offset displays -1, I am not getting the messages from the beginning
> The retention window policy is set as -log.retention.hours=168
> 
> If I produce new messages, then those messages are consumed and I can see the logged statements
> 
> If I use the simple consumer API and specify the starting offset as 0, then I am able to read from the beginning
> 
> Are there any settings that would enable for new consumer group to read messages from the beginning?
> 
> Thanks,
> Srividhya
> 
> This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.