You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Susheel Kumar <su...@gmail.com> on 2017/10/26 01:40:11 UTC

Consumer poll returning 0 results

Hello Kafka Users,

I am trying to run below sample code mentioned in Kafka documentation under
Automatic Offset Committing for a topic with 1 partition  (tried with 3 and
more partition as well). Create command as follows

bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3
--partitions 1 --topic test --config cleanup.policy=compact,delete

but the sample code always returns 0 records unless I provide a custom
ConsumerRebalanceListener (below) which sets consumer to beginning.

I wonder if the sample code given at Kafka documentation is wrong or am I
missing something?

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


*Automatic Offset Committing*

This example demonstrates a simple usage of Kafka's consumer api that
relying on automatic offset committing.

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
     }



====

public class SeekToBeginingConsumerRebalancerListener implements
org.apache.kafka.clients.consumer.ConsumerRebalanceListener {

     private Consumer<String, String> consumer;
     public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String,
String> consumer2) {
             this.consumer = consumer2;
     }
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             for (TopicPartition partition : partitions) {

//offsetManager.saveOffsetInExternalStore(partition.topic(),
partition.partition(),consumer.position(partition));
             }
     }
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            /* for (TopicPartition partition : partitions) {
                     consumer.seek(partition,seekTo));
             }*/
    	 consumer.seekToBeginning(partitions);
     }
}

Re: Consumer poll returning 0 results

Posted by Susheel Kumar <su...@gmail.com>.
Thank you Ted, Konstantine. Now I understand. The option 1 worked and if I
produce more records after consumer started, I can see it being returned.

Though option 2 as you described should work but setting auto.offset.reset
to earliest doesn't seems to work and acts like without "--from-beginning".
May be I am missing something.

The Kafka version I am using 0.10.1.1 and Java 1.8
==



 Properties props = new Properties();
            props.put("auto.offset.reset", "earliest");
            props.put("bootstrap.servers", "server:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");

            props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records =
consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, key = %s, value =
%s%n", record.offset(), record.key(), record.value());
            }



On Wed, Oct 25, 2017 at 11:41 PM, Konstantine Karantasis <
konstantine@confluent.io> wrote:

> Are you producing any records after you start the consumer?
>
> By default, Kafka consumer starts with auto.offset.reset == latest (
> https://kafka.apache.org/documentation/#newconsumerconfigs), which means
> that if the consumer doesn't find a previous offset for its consumer group
> (e.g. the first time the consumer runs) it will start consuming from the
> latest offset and on. Therefore, if there are no new records produced to
> this Kafka topic after the consumer is started (specifically after has a
> partitions assigned to it), the consumer won't return anything.
>
> To try the above snippet, you have two easy options:
>
> 1) Produce records after you start the consumer (e.g. with
> kafka-console-producer)
> 2) Set auto.offset.reset to earliest for this consumer (e.g. in you code
> above, props.put("auto.offset.reset", "earliest"); ). Test equivalent
> behavior with a command such as:
> bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test
> --from-beginning
>
> Omitting "--from-beginning" will be equivalent to what you observe above.
>
> Konstantine
>
> On Wed, Oct 25, 2017 at 6:48 PM, Ted Yu <yu...@gmail.com> wrote:
>
> > Can you provide a bit more information ?
> >
> > Release of Kafka
> > Java / Scala version
> >
> > Thanks
> >
> > On Wed, Oct 25, 2017 at 6:40 PM, Susheel Kumar <su...@gmail.com>
> > wrote:
> >
> > > Hello Kafka Users,
> > >
> > > I am trying to run below sample code mentioned in Kafka documentation
> > under
> > > Automatic Offset Committing for a topic with 1 partition  (tried with 3
> > and
> > > more partition as well). Create command as follows
> > >
> > > bin/kafka-topics.sh --create --zookeeper <ZK>:2181
> --replication-factor 3
> > > --partitions 1 --topic test --config cleanup.policy=compact,delete
> > >
> > > but the sample code always returns 0 records unless I provide a custom
> > > ConsumerRebalanceListener (below) which sets consumer to beginning.
> > >
> > > I wonder if the sample code given at Kafka documentation is wrong or
> am I
> > > missing something?
> > >
> > > https://kafka.apache.org/0101/javadoc/index.html?org/apache/
> > > kafka/clients/consumer/KafkaConsumer.html
> > >
> > >
> > > *Automatic Offset Committing*
> > >
> > > This example demonstrates a simple usage of Kafka's consumer api that
> > > relying on automatic offset committing.
> > >
> > >      Properties props = new Properties();
> > >      props.put("bootstrap.servers", "localhost:9092");
> > >      props.put("group.id", "test");
> > >      props.put("enable.auto.commit", "true");
> > >      props.put("auto.commit.interval.ms", "1000");
> > >      props.put("key.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >      props.put("value.deserializer",
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >      KafkaConsumer<String, String> consumer = new
> KafkaConsumer<>(props);
> > >      consumer.subscribe(Arrays.asList("foo", "bar"));
> > >      while (true) {
> > >          ConsumerRecords<String, String> records = consumer.poll(100);
> > >          for (ConsumerRecord<String, String> record : records)
> > >              System.out.printf("offset = %d, key = %s, value = %s%n",
> > > record.offset(), record.key(), record.value());
> > >      }
> > >
> > >
> > >
> > > ====
> > >
> > > public class SeekToBeginingConsumerRebalancerListener implements
> > > org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
> > >
> > >      private Consumer<String, String> consumer;
> > >      public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<
> > String,
> > > String> consumer2) {
> > >              this.consumer = consumer2;
> > >      }
> > >      public void onPartitionsRevoked(Collection<TopicPartition>
> > > partitions) {
> > >              for (TopicPartition partition : partitions) {
> > >
> > > //offsetManager.saveOffsetInExternalStore(partition.topic(),
> > > partition.partition(),consumer.position(partition));
> > >              }
> > >      }
> > >      public void onPartitionsAssigned(Collection<TopicPartition>
> > > partitions) {
> > >             /* for (TopicPartition partition : partitions) {
> > >                      consumer.seek(partition,seekTo));
> > >              }*/
> > >          consumer.seekToBeginning(partitions);
> > >      }
> > > }
> > >
> >
>

Re: Consumer poll returning 0 results

Posted by Konstantine Karantasis <ko...@confluent.io>.
Are you producing any records after you start the consumer?

By default, Kafka consumer starts with auto.offset.reset == latest (
https://kafka.apache.org/documentation/#newconsumerconfigs), which means
that if the consumer doesn't find a previous offset for its consumer group
(e.g. the first time the consumer runs) it will start consuming from the
latest offset and on. Therefore, if there are no new records produced to
this Kafka topic after the consumer is started (specifically after has a
partitions assigned to it), the consumer won't return anything.

To try the above snippet, you have two easy options:

1) Produce records after you start the consumer (e.g. with
kafka-console-producer)
2) Set auto.offset.reset to earliest for this consumer (e.g. in you code
above, props.put("auto.offset.reset", "earliest"); ). Test equivalent
behavior with a command such as:
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test
--from-beginning

Omitting "--from-beginning" will be equivalent to what you observe above.

Konstantine

On Wed, Oct 25, 2017 at 6:48 PM, Ted Yu <yu...@gmail.com> wrote:

> Can you provide a bit more information ?
>
> Release of Kafka
> Java / Scala version
>
> Thanks
>
> On Wed, Oct 25, 2017 at 6:40 PM, Susheel Kumar <su...@gmail.com>
> wrote:
>
> > Hello Kafka Users,
> >
> > I am trying to run below sample code mentioned in Kafka documentation
> under
> > Automatic Offset Committing for a topic with 1 partition  (tried with 3
> and
> > more partition as well). Create command as follows
> >
> > bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3
> > --partitions 1 --topic test --config cleanup.policy=compact,delete
> >
> > but the sample code always returns 0 records unless I provide a custom
> > ConsumerRebalanceListener (below) which sets consumer to beginning.
> >
> > I wonder if the sample code given at Kafka documentation is wrong or am I
> > missing something?
> >
> > https://kafka.apache.org/0101/javadoc/index.html?org/apache/
> > kafka/clients/consumer/KafkaConsumer.html
> >
> >
> > *Automatic Offset Committing*
> >
> > This example demonstrates a simple usage of Kafka's consumer api that
> > relying on automatic offset committing.
> >
> >      Properties props = new Properties();
> >      props.put("bootstrap.servers", "localhost:9092");
> >      props.put("group.id", "test");
> >      props.put("enable.auto.commit", "true");
> >      props.put("auto.commit.interval.ms", "1000");
> >      props.put("key.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >      props.put("value.deserializer",
> > "org.apache.kafka.common.serialization.StringDeserializer");
> >      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
> >      consumer.subscribe(Arrays.asList("foo", "bar"));
> >      while (true) {
> >          ConsumerRecords<String, String> records = consumer.poll(100);
> >          for (ConsumerRecord<String, String> record : records)
> >              System.out.printf("offset = %d, key = %s, value = %s%n",
> > record.offset(), record.key(), record.value());
> >      }
> >
> >
> >
> > ====
> >
> > public class SeekToBeginingConsumerRebalancerListener implements
> > org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
> >
> >      private Consumer<String, String> consumer;
> >      public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<
> String,
> > String> consumer2) {
> >              this.consumer = consumer2;
> >      }
> >      public void onPartitionsRevoked(Collection<TopicPartition>
> > partitions) {
> >              for (TopicPartition partition : partitions) {
> >
> > //offsetManager.saveOffsetInExternalStore(partition.topic(),
> > partition.partition(),consumer.position(partition));
> >              }
> >      }
> >      public void onPartitionsAssigned(Collection<TopicPartition>
> > partitions) {
> >             /* for (TopicPartition partition : partitions) {
> >                      consumer.seek(partition,seekTo));
> >              }*/
> >          consumer.seekToBeginning(partitions);
> >      }
> > }
> >
>

Re: Consumer poll returning 0 results

Posted by Ted Yu <yu...@gmail.com>.
Can you provide a bit more information ?

Release of Kafka
Java / Scala version

Thanks

On Wed, Oct 25, 2017 at 6:40 PM, Susheel Kumar <su...@gmail.com>
wrote:

> Hello Kafka Users,
>
> I am trying to run below sample code mentioned in Kafka documentation under
> Automatic Offset Committing for a topic with 1 partition  (tried with 3 and
> more partition as well). Create command as follows
>
> bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3
> --partitions 1 --topic test --config cleanup.policy=compact,delete
>
> but the sample code always returns 0 records unless I provide a custom
> ConsumerRebalanceListener (below) which sets consumer to beginning.
>
> I wonder if the sample code given at Kafka documentation is wrong or am I
> missing something?
>
> https://kafka.apache.org/0101/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
>
>
> *Automatic Offset Committing*
>
> This example demonstrates a simple usage of Kafka's consumer api that
> relying on automatic offset committing.
>
>      Properties props = new Properties();
>      props.put("bootstrap.servers", "localhost:9092");
>      props.put("group.id", "test");
>      props.put("enable.auto.commit", "true");
>      props.put("auto.commit.interval.ms", "1000");
>      props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
>      consumer.subscribe(Arrays.asList("foo", "bar"));
>      while (true) {
>          ConsumerRecords<String, String> records = consumer.poll(100);
>          for (ConsumerRecord<String, String> record : records)
>              System.out.printf("offset = %d, key = %s, value = %s%n",
> record.offset(), record.key(), record.value());
>      }
>
>
>
> ====
>
> public class SeekToBeginingConsumerRebalancerListener implements
> org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
>
>      private Consumer<String, String> consumer;
>      public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String,
> String> consumer2) {
>              this.consumer = consumer2;
>      }
>      public void onPartitionsRevoked(Collection<TopicPartition>
> partitions) {
>              for (TopicPartition partition : partitions) {
>
> //offsetManager.saveOffsetInExternalStore(partition.topic(),
> partition.partition(),consumer.position(partition));
>              }
>      }
>      public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
>             /* for (TopicPartition partition : partitions) {
>                      consumer.seek(partition,seekTo));
>              }*/
>          consumer.seekToBeginning(partitions);
>      }
> }
>