You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rag <ra...@gmail.com> on 2018/07/20 21:42:25 UTC

Unable to read topic from beginning after certain runs

Hi all,

I am new to Kafka. I have a problem reading from a topic multiple times
with kafka-console-consumer and a Java client. Any help is appreciated.

I have used a tool called Kandalf (https://github.com/hellofresh/kandalf)
to extract RabbitMQ messages to be pushed in to a Kafka topic
`usage-records`. The Kafka version was 0.8.x that came with the Kandalf set
of Docker containers.

In my local, I removed all files in `kafka-logs` except .log and .index as
I was not sure how the 0.8.x metadata is compatible with Kafka 1.0.1. I did
also have a empty Zookeeper locally. The folder looked like below.
```
kafka-logs/usage-records-0
kafka-logs/usage-records-0/00000000000000000000.log
kafka-logs/usage-records-0/00000000000000000000.index
```

When I start Kafka, I see logs which talk about building some index / also
possibly caching some metadata in Zookeeper.  I am not sure what data is
actually stored in Zookeeper.

Then I start a kafka-console-consumer to read all records from beginning
and it reads >0 records. If I run the same kafka-console-consumer several
times from beginning, it returns same number of records each time.

I originally tried to play with Kafka streams and I got a simple
aggregate-count working but could not get aggregate-sum working and then I
thought let me first understand KafkaConsumer well. So I wrote a class to
play with KafkaConsumer class and once the code is run; it yields no
records for the topic eventhough I would expect it to as it should start
from offset zero. I later run kafka-console-consumer to read the same topic
from beginning, it then returns zero records. I do not know which tool I
can use to check whether the topics have been emptied. To my understanding
- there is no client that I have used that issued a topic delete
- offset of consumer should not matter as kafka-console-consumer is run
with flag --from-beginning

The Java code using KafkaConsumer

```

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class ConsumerMain {

  public static final String TOPIC = "usage-records";

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "consumer.main.2");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", StringDeserializer.class);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singleton(TOPIC));


    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(500);

      System.out.println(">> Obtained records of size: " + records.count());

      records.forEach(record -> System.out.println(record.value()));
    }
  }

}


```

Thanks
Raghavan

RE: Unable to read topic from beginning after certain runs

Posted by 赖剑清 <la...@tp-link.com.cn>.
Hi,

One possible reason is that the record file had been deleted by the kafka-server.
By default, you can check the files in /tmp/kafka-logs/${topic-name}, where the msgs stored.
Or you can get the exact file path from the server.properties.

>-----Original Message-----
>From: Rag [mailto:raghavan20@gmail.com]
>Sent: Saturday, July 21, 2018 10:56 PM
>To: users@kafka.apache.org
>Subject: Fwd: Unable to read topic from beginning after certain runs
>
>Hi all,
>
>I am new to Kafka. I have a problem reading from a topic multiple times with
>kafka-console-consumer and a Java client. Any help is appreciated.
>
>I have used a tool called Kandalf (https://github.com/hellofresh/kandalf)
>to extract RabbitMQ messages to be pushed in to a Kafka topic `usage-
>records`. The Kafka version was 0.8.x that came with the Kandalf set of Docker
>containers.
>
>In my local, I removed all files in `kafka-logs` except .log and .index as I was not
>sure how the 0.8.x metadata is compatible with Kafka 1.0.1. I did also have a
>empty Zookeeper locally. The folder looked like below.
>```
>kafka-logs/usage-records-0
>kafka-logs/usage-records-0/00000000000000000000.log
>kafka-logs/usage-records-0/00000000000000000000.index
>```
>
>When I start Kafka, I see logs which mention about building some index ( also
>possibly caching some metadata in Zookeeper but I am not sure what data is
>actually stored in Zookeeper)
>
>Then I start a kafka-console-consumer to read all records of this topic from
>beginning and it reads *>0* records. If I run the same kafka-console-
>consumer several times to list from beginning, it returns same number of
>records each time.
>
>I originally tried to play with Kafka streams and I got a simple aggregate-count
>working but could not get aggregate-sum working and then I thought let me
>first play with KafkaConsumer to understand it workings. So I wrote a class to
>play with KafkaConsumer class and once the code is run; it yields no records
>for the topic even though I would expect it to as it should start from offset
>zero. I later run kafka-console-consumer to read the same topic from
>beginning, it then returns zero records. I do not know which tool I can use to
>check whether the topic has been emptied. To my understanding
>- there is no client that I have used that would issue a topic delete
>- offset of consumer should not matter as kafka-console-consumer is run with
>flag `--from-beginning`
>
>The Java code using KafkaConsumer
>```
>import org.apache.kafka.clients.consumer.ConsumerRecords;
>import org.apache.kafka.clients.consumer.KafkaConsumer;
>import org.apache.kafka.common.serialization.StringDeserializer;
>
>import java.util.Collections;
>import java.util.Properties;
>// Simple class to list all records in the topic from the beginning public class
>ConsumerMain {
>  public static final String TOPIC = "usage-records";
>  public static void main(String[] args) {
>    Properties props = new Properties();
>    props.put("bootstrap.servers", "localhost:9092");
>    props.put("group.id", "consumer.main.2");
>    props.put("key.deserializer", StringDeserializer.class);
>    props.put("value.deserializer", StringDeserializer.class);
>
>    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
>    consumer.subscribe(Collections.singleton(TOPIC));
>    while (true) {
>      ConsumerRecords<String, String> records = consumer.poll(500);
>      System.out.println(">> Obtained records of size: " + records.count());
>     records.forEach(record -> System.out.println(record.value()));
>    }
>  }
>}
>
>```
>
>Thanks
>Raghavan

Fwd: Unable to read topic from beginning after certain runs

Posted by Rag <ra...@gmail.com>.
Hi all,

I am new to Kafka. I have a problem reading from a topic multiple times
with kafka-console-consumer and a Java client. Any help is appreciated.

I have used a tool called Kandalf (https://github.com/hellofresh/kandalf)
to extract RabbitMQ messages to be pushed in to a Kafka topic
`usage-records`. The Kafka version was 0.8.x that came with the Kandalf set
of Docker containers.

In my local, I removed all files in `kafka-logs` except .log and .index as
I was not sure how the 0.8.x metadata is compatible with Kafka 1.0.1. I did
also have a empty Zookeeper locally. The folder looked like below.
```
kafka-logs/usage-records-0
kafka-logs/usage-records-0/00000000000000000000.log
kafka-logs/usage-records-0/00000000000000000000.index
```

When I start Kafka, I see logs which mention about building some index (
also possibly caching some metadata in Zookeeper but I am not sure what
data is actually stored in Zookeeper)

Then I start a kafka-console-consumer to read all records of this topic
from beginning and it reads *>0* records. If I run the same
kafka-console-consumer several times to list from beginning, it returns
same number of records each time.

I originally tried to play with Kafka streams and I got a simple
aggregate-count working but could not get aggregate-sum working and then I
thought let me first play with KafkaConsumer to understand it workings. So
I wrote a class to play with KafkaConsumer class and once the code is run;
it yields no records for the topic even though I would expect it to as it
should start from offset zero. I later run kafka-console-consumer to read
the same topic from beginning, it then returns zero records. I do not know
which tool I can use to check whether the topic has been emptied. To my
understanding
- there is no client that I have used that would issue a topic delete
- offset of consumer should not matter as kafka-console-consumer is run
with flag `--from-beginning`

The Java code using KafkaConsumer
```
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;
// Simple class to list all records in the topic from the beginning
public class ConsumerMain {
  public static final String TOPIC = "usage-records";
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "consumer.main.2");
    props.put("key.deserializer", StringDeserializer.class);
    props.put("value.deserializer", StringDeserializer.class);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singleton(TOPIC));
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(500);
      System.out.println(">> Obtained records of size: " + records.count());
     records.forEach(record -> System.out.println(record.value()));
    }
  }
}

```

Thanks
Raghavan