You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Yi Yin <yi...@gmail.com> on 2018/03/06 23:47:02 UTC
KafkaConsumer (0.11) assign then poll - doesn't return results for
all assigned TopicPartitions
I want to manually fetch messages from all partitions of a topic. I'm doing
this by:
1. Create a list of TopicPartition - one for each partition of my topic
2. Create KafkConsumer, and call .assign(myTopicPartitionsList)
3. For Each TopicPartition, seek to the offset I want to read
But when I call consumer.poll(timeOut) - I only get messages from one of my
partitions. I checked the consumer configs and don't see anything that
would limit the fetching to some the partitions.
Does anyone have any insights on how I can fetch messages for all the
TopicPartition assigned to the consumer?
I posted the relevant code snippet and consumer config below.
Thanks!
relevant code:
List<TopicPartition> topicPartitions = new ArrayList<>();
for (int partitionNumber = 0; partitionNumber < NumPartitionsForTopic;
partitionNumber++) {
topicPartitions.add(new TopicPartition("my_topic", partitionNumber));
}
...
try {
KafkaConsumer consumer = KafkaConsumer<>(consumerProps);
consumer.assign(topicPartitions);
//seek to specific offset for each partition of the topic
for (TopicParittion topicPartition : topicPartitions) {
consumer.seek(topicPartition,
offsetForPartition[topicPartition.partition()]);
}
ConsumerRecords<byte[], byte[]> records = consumer.poll(10000);
//ISSUE: records only contains messages from 1 of the partitions
} catch (...) {
//hndle exceptions
} finally {
//close consumer
}
My KafkaConsumer config:
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [brokerhostname1.com:9092, brokerhostname2.com:9092,
brokerhostname3.com:9092]
check.crcs = true
client.id = my_client_id
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my_group_id_123
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 3145728
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
send.buffer.bytes = 131072
session.timeout.ms = 10000
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer