You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yi Yin <yi...@gmail.com> on 2018/03/06 23:47:02 UTC

KafkaConsumer (0.11) assign then poll - doesn't return results for all assigned TopicPartitions

I want to manually fetch messages from all partitions of a topic. I'm doing
this by:

1. Create a list of TopicPartition - one for each partition of my topic
2. Create KafkConsumer, and call .assign(myTopicPartitionsList)
3. For Each TopicPartition, seek to the offset I want to read


But when I call consumer.poll(timeOut) - I only get messages from one of my
partitions. I checked the consumer configs and don't see anything that
would limit the fetching to some the partitions.

Does anyone have any insights on how I can fetch messages for all the
TopicPartition assigned to the consumer?

I posted the relevant code snippet and consumer config below.

Thanks!




relevant code:

    List<TopicPartition> topicPartitions = new ArrayList<>();

    for (int partitionNumber = 0; partitionNumber < NumPartitionsForTopic;
partitionNumber++) {
      topicPartitions.add(new TopicPartition("my_topic", partitionNumber));
    }

    ...

    try {
      KafkaConsumer consumer = KafkaConsumer<>(consumerProps);
      consumer.assign(topicPartitions);

      //seek to specific offset for each partition of the topic
      for (TopicParittion topicPartition : topicPartitions) {
        consumer.seek(topicPartition,
offsetForPartition[topicPartition.partition()]);
      }
      ConsumerRecords<byte[], byte[]> records = consumer.poll(10000);
      //ISSUE: records only contains messages from 1 of the partitions
    } catch (...) {
      //hndle exceptions
    } finally {
      //close consumer
    }


My KafkaConsumer config:
INFO: ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = none
    bootstrap.servers = [brokerhostname1.com:9092, brokerhostname2.com:9092,
brokerhostname3.com:9092]
    check.crcs = true
    client.id = my_client_id
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my_group_id_123
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 3145728
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer