You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Marina <pp...@yahoo.com.INVALID> on 2016/10/12 17:50:06 UTC

How to specify starting position for consumers when using dynamic group re-balancing?

Hi, 
Is it possible to start 0.9 or 0.10 consumers from a specified offset, while still using consumer groups with dynamic re-balancing? 

Here is what have found so far: 
Case 1: If we use consumer.assign(…)  method to manually assign partitions to consumers - we can do all below actions: 
consumer.seek(<specificPartition>, <myCustomOffset>); or: 
consumer.seekToBeginning(<specificPartition>); 
consumer.seekToEnd(<specificPartition>); 

Basically, we have full control over which position to start the consumer from, BUT at the expense of not having the partition re-assignment done dynamically by Kafka 

Case 2: If we use consumer.subscribe(…)  method  - Kafka will manage the re-ballancing, however, we cannot do any of the three options above … :( 
So, we tried the following to “hack” around it - at the consumer start up time, *before* entering the poll() loop: 

// get coordinator from the private field of the consumer: 
ConsumerCoordinator coordinator = (ConsumerCoordinator) FieldUtils.readField(consumer, "coordinator", true); 
// make sure all partitions are already  
coordinator.ensurePartitionAssignment(); 
// get the list of partitions assigned to this specific consumer: 
Set<TopicPartition> assignedTopicPartitions = consumer.assignment() 
// now we can go ahead and do the same three actions (seek(), sequined() or seekToBeginning()) on those partitions only for this consumer as above. 
for (TopicPartition assignedPartition: assignedTopicPartitions) { 
     consumer.seek(<assignedPartition>, <myCustomOffset>) // or whatever
...
}
// now start the poll() loop:
while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs); 
     for (ConsumerRecord<String, String> record : records) { 
          // processMessage(record.value(), record.offset()); 
     } 
} 

This feels too hack-y for my taste, and, also, I am not sure if this logic will hold during the actual re-balancing , when, say, new consumers are added to the group. 

Could somebody validate this approach or suggest a better way to accomplish what we need ? 


thanks! 

Marina