You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kevin Perera <kp...@ippon.fr> on 2019/06/24 15:13:31 UTC

Kafka Consumers - keeping them open

Hello! I’m interested in trying to get my Kafka Consumer to keep eating records. However, after a short period of time, it stops incrementing. How do you usually get this to work? Below is a short configuration that I use for my KafkaConsumer. Any help would be greatly appreciated. 

hostname = InetAddress.getLocalHost().getHostName();
// configuration for how we consume records.
tweetsProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostname + ":9092");
tweetsProps.put(ConsumerConfig.GROUP_ID_CONFIG, “groupID");
tweetsProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
tweetsProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
tweetsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
tweetsProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
tweetsProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);


Re: Kafka Consumers - keeping them open

Posted by Boyang Chen <re...@gmail.com>.
Hey Kevin,

could you give more context on what it means for `keep eating records` and
`stops incrementing`? In a typical use case, you should call `poll()` in a
while loop, and if you stop seeing new records, it could be either your
consumer is not working correctly, or your input volume is not fed in fast
enough.

Boyang

On Mon, Jun 24, 2019 at 8:13 AM Kevin Perera <kp...@ippon.fr> wrote:

> Hello! I’m interested in trying to get my Kafka Consumer to keep eating
> records. However, after a short period of time, it stops incrementing. How
> do you usually get this to work? Below is a short configuration that I use
> for my KafkaConsumer. Any help would be greatly appreciated.
>
> hostname = InetAddress.getLocalHost().getHostName();
> // configuration for how we consume records.
> tweetsProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hostname +
> ":9092");
> tweetsProps.put(ConsumerConfig.GROUP_ID_CONFIG, “groupID");
> tweetsProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
> tweetsProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
> tweetsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> tweetsProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
> tweetsProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class);
>
>