You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Anurag Laddha <an...@gmail.com> on 2016/02/14 19:50:05 UTC

Consumption from some of the partitions stops after rebalancing of partitions with kafka 0.9

Greetings

I have created a consumer using kafka 0.9
When one or more *new* consumers are added to an existing consumer group it
causes partitions to get rebalanced (as expected)

*Issue*:
- After rebalancing, all the partitions of the topic get assigned to one of
the consumers in the group (again, as expected) but *some of the consumers
never consume from some of the partitions after rebalancing happens even
when those partitions have pending messages to be consumed.*
*- *This issue happens in both manual and auto offset commit mode
- This happens after partitions are rebalanced when a new consumer is added
or an existing consumer leaves/dies

I have attached the code sample which demonstrates the problem:
- create a topic in kafka with 3 partitions (say "test-again" as topic name)
- create 1 consumer (c1) and 1 producer thread - all the 3 partitions will
be assigned to c1.
- after few seconds add one more consumer (c2) to the same consumer group
that c1 was part of.
- this leads to 3 partitions be rebalanced between c1 and c2.
- after this c1 never consumes from any of the partitions assigned to it

Log statement that demonstrates the issue:
- C: 1, Number of records received: 0  (consumer c1 doesn't receive any
message)
- logs show partitions are rebalanced when consumers are added/removed from
the consumer group.
- if you look at number of messages in partitions assigned to consumer c1
(using bin/kafka-consumer-offset-checker.sh --topic test-again --group
test-manual --zookeeper localhost:2181), there are lots of unprocessed
messages.

*Can someone please tell me what wrong am i doing? *

Pasting the sample code below:

package simple;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConsumerRebalance_Manual {
    private static Logger logger =
LoggerFactory.getLogger(ConsumerRebalance_Manual.class);
    private static volatile boolean produceMsg = true;
    private static volatile boolean consumeMsg = true;

    static class ProduceMsg implements Runnable{
        private int producerId;

        public ProduceMsg(int producerId) {
            this.producerId = producerId;
        }

        private void sleep(long sleepMs){
            try {
                Thread.sleep(sleepMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            logger.info("Starting producer: {}", this.producerId);

            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("acks", "all");
            props.put("retries", 1);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

            logger.info("Starting with producing messages: {}",
this.producerId);
            Producer<String, String> producer = new KafkaProducer(props);
            int counter = 0;
            while(produceMsg) {
                for (int i = 0; i < 10; i++) {
                    String msg = "message-" + counter++;
                    producer.send(new ProducerRecord<String,
String>("test-again", msg));
                }
                logger.info("Sent a batch for publishing");
                producer.flush();
                sleep(1000);
            }
            producer.close();
            logger.info("P: {}, Exiting producer!", this.producerId);
        }
    }

    static class ConsumeMsg implements Runnable{
        private int consumerId;

        public ConsumeMsg(int consumerId) {
            this.consumerId = consumerId;
        }

        private Properties getConsumerProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", "127.0.0.1:9092");
            props.put("group.id", "test-manual");
            props.put("enable.auto.commit", "false");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
            props.put("max.partition.fetch.bytes", 150);
            return props;
        }

        private void commitOffsets(KafkaConsumer<String, String>
consumer, Map<TopicPartition, Long> partitionToOffsetMap){
            if (partitionToOffsetMap.size() > 0) {
                Map<TopicPartition, OffsetAndMetadata>
partitionToMetadataMap = new HashMap<>();
                for (Map.Entry<TopicPartition, Long>
singlePartitionEntry : partitionToOffsetMap.entrySet()) {

partitionToMetadataMap.put(singlePartitionEntry.getKey(), new
OffsetAndMetadata(singlePartitionEntry.getValue() + 1));
                }
                if (partitionToMetadataMap.size() > 0) {
                    logger.info("C: {}, Thread: {}, Committing
following offsets: {}", this.consumerId,
                        Thread.currentThread().getName(),
partitionToMetadataMap);
                    consumer.commitSync(partitionToMetadataMap);
                }
                partitionToOffsetMap.clear();
            }
        }

        private void sleep(long sleepMs){
            try {
                Thread.sleep(sleepMs);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            logger.info("Starting consumer: {}", this.consumerId);

            Map<TopicPartition, Long> partitionToUncommittedOffsetMap
= new HashMap<>();
            Properties props = getConsumerProperties();
            KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);

            ConsumerRebalanceListener listener = new
ConsumerRebalanceListener() {
                @Override
                public void
onPartitionsRevoked(Collection<TopicPartition> collection) {
                    for (TopicPartition topicPartition : collection) {
                        logger.info("C: {}, Thread: {}, ~~Unsubscribed
to topic: {}, partition: {}, ",
                            consumerId,
Thread.currentThread().getName(), topicPartition.topic(),
topicPartition.partition());
                    }
                    logger.info("C: {}, committing offsets when
partitions were revoked", consumerId);
                    commitOffsets(consumer, partitionToUncommittedOffsetMap);
                }

                @Override
                public void
onPartitionsAssigned(Collection<TopicPartition> collection) {
                    OffsetAndMetadata currPartionData;
                    for (TopicPartition topicPartition : collection) {
                        currPartionData = consumer.committed(topicPartition);
                        logger.info("C: {}, Thread: {},
$$Subscriptions: Topic: {}, partition: {}, offset: {}",
                            consumerId,
Thread.currentThread().getName(), topicPartition.topic(),
topicPartition.partition(),
                            currPartionData.offset());
                        consumer.seek(topicPartition, currPartionData.offset());
                    }
                }
            };
            consumer.subscribe(Arrays.asList("test-again"), listener);

            logger.info("Starting to process records for consumer:
{}", this.consumerId);
            long lastUpdateTimeMs = System.currentTimeMillis();

            while(consumeMsg) {
                ConsumerRecords<String, String> records =
consumer.poll(1000); //fetch more records to process
                logger.info("C: {}, Number of records received: {} ",
consumerId, records.count());
                if (records == null || records.count() == 0){
                    logger.info("C: {}, Found no records. Sleeping for
a while", this.consumerId);
                    sleep(500);
                    continue;
                }
                //process each record
                for (ConsumerRecord<String, String> record : records) {
                    logger.info("C: {}, Received message: Topic: {},
Partition: {}, Offset: {}, Thread: {}",
                        this.consumerId, record.topic(),
record.partition(), record.offset(),
                        Thread.currentThread().getName());

                    //track and commit offset of msg processed
                    partitionToUncommittedOffsetMap.put(new
TopicPartition(record.topic(), record.partition()), record.offset());
                    if (partitionToUncommittedOffsetMap.size() > 0 &&
(System.currentTimeMillis() - lastUpdateTimeMs > 1000)){
                        commitOffsets(consumer,
partitionToUncommittedOffsetMap);
                        lastUpdateTimeMs = System.currentTimeMillis();
                    }

                    sleep(200);
                }
            }

            commitOffsets(consumer, partitionToUncommittedOffsetMap);
            consumer.close();
            logger.info("Exiting consumer: {}", this.consumerId);
        }
    }


    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.submit(new ProduceMsg(1));
        executorService.submit(new ConsumeMsg(1));  //start first consumer

        Thread.sleep(3000);
        executorService.submit(new ConsumeMsg(2));  //start second
consumer after few seconds

        Thread.sleep(TimeUnit.SECONDS.toMillis(180)); //let both
consumer run for few minutes
        produceMsg = false;
        consumeMsg = false;

        executorService.shutdown();
        logger.info("Waiting for executor service shutdown");
        while(!executorService.isTerminated()){}
        logger.info("Exiting app");
    }
}

Re: Consumption from some of the partitions stops after rebalancing of partitions with kafka 0.9

Posted by Damian Guy <da...@gmail.com>.
Hi,

It is a bug in the consumer that has been fixed by KAFKA-2978. You should
try building the consumer from the latest 0.9.0 branch (or the 0.9.0.1 RC).
I've had the same issue and confirmed it works fine on the latest 0.9.0.

Thanks,
Damian

On 14 February 2016 at 18:50, Anurag Laddha <an...@gmail.com> wrote:

> Greetings
>
> I have created a consumer using kafka 0.9
> When one or more *new* consumers are added to an existing consumer group
> it causes partitions to get rebalanced (as expected)
>
> *Issue*:
> - After rebalancing, all the partitions of the topic get assigned to one
> of the consumers in the group (again, as expected) but *some of the
> consumers never consume from some of the partitions after rebalancing
> happens even when those partitions have pending messages to be consumed.*
> *- *This issue happens in both manual and auto offset commit mode
> - This happens after partitions are rebalanced when a new consumer is
> added or an existing consumer leaves/dies
>
> I have attached the code sample which demonstrates the problem:
> - create a topic in kafka with 3 partitions (say "test-again" as topic
> name)
> - create 1 consumer (c1) and 1 producer thread - all the 3 partitions will
> be assigned to c1.
> - after few seconds add one more consumer (c2) to the same consumer group
> that c1 was part of.
> - this leads to 3 partitions be rebalanced between c1 and c2.
> - after this c1 never consumes from any of the partitions assigned to it
>
> Log statement that demonstrates the issue:
> - C: 1, Number of records received: 0  (consumer c1 doesn't receive any
> message)
> - logs show partitions are rebalanced when consumers are added/removed
> from the consumer group.
> - if you look at number of messages in partitions assigned to consumer c1
> (using bin/kafka-consumer-offset-checker.sh --topic test-again --group
> test-manual --zookeeper localhost:2181), there are lots of unprocessed
> messages.
>
> *Can someone please tell me what wrong am i doing? *
>
> Pasting the sample code below:
>
> package simple;
>
> import org.apache.kafka.clients.consumer.*;
> import org.apache.kafka.clients.producer.*;
> import org.apache.kafka.common.TopicPartition;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.util.*;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.TimeUnit;
>
> public class ConsumerRebalance_Manual {
>     private static Logger logger = LoggerFactory.getLogger(ConsumerRebalance_Manual.class);
>     private static volatile boolean produceMsg = true;
>     private static volatile boolean consumeMsg = true;
>
>     static class ProduceMsg implements Runnable{
>         private int producerId;
>
>         public ProduceMsg(int producerId) {
>             this.producerId = producerId;
>         }
>
>         private void sleep(long sleepMs){
>             try {
>                 Thread.sleep(sleepMs);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         @Override
>         public void run() {
>             logger.info("Starting producer: {}", this.producerId);
>
>             Properties props = new Properties();
>             props.put("bootstrap.servers", "127.0.0.1:9092");
>             props.put("acks", "all");
>             props.put("retries", 1);
>             props.put("batch.size", 16384);
>             props.put("linger.ms", 1);
>             props.put("buffer.memory", 33554432);
>             props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>             props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>
>             logger.info("Starting with producing messages: {}", this.producerId);
>             Producer<String, String> producer = new KafkaProducer(props);
>             int counter = 0;
>             while(produceMsg) {
>                 for (int i = 0; i < 10; i++) {
>                     String msg = "message-" + counter++;
>                     producer.send(new ProducerRecord<String, String>("test-again", msg));
>                 }
>                 logger.info("Sent a batch for publishing");
>                 producer.flush();
>                 sleep(1000);
>             }
>             producer.close();
>             logger.info("P: {}, Exiting producer!", this.producerId);
>         }
>     }
>
>     static class ConsumeMsg implements Runnable{
>         private int consumerId;
>
>         public ConsumeMsg(int consumerId) {
>             this.consumerId = consumerId;
>         }
>
>         private Properties getConsumerProperties(){
>             Properties props = new Properties();
>             props.put("bootstrap.servers", "127.0.0.1:9092");
>             props.put("group.id", "test-manual");
>             props.put("enable.auto.commit", "false");
>             props.put("auto.offset.reset", "earliest");
>             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>             props.put("max.partition.fetch.bytes", 150);
>             return props;
>         }
>
>         private void commitOffsets(KafkaConsumer<String, String> consumer, Map<TopicPartition, Long> partitionToOffsetMap){
>             if (partitionToOffsetMap.size() > 0) {
>                 Map<TopicPartition, OffsetAndMetadata> partitionToMetadataMap = new HashMap<>();
>                 for (Map.Entry<TopicPartition, Long> singlePartitionEntry : partitionToOffsetMap.entrySet()) {
>                     partitionToMetadataMap.put(singlePartitionEntry.getKey(), new OffsetAndMetadata(singlePartitionEntry.getValue() + 1));
>                 }
>                 if (partitionToMetadataMap.size() > 0) {
>                     logger.info("C: {}, Thread: {}, Committing following offsets: {}", this.consumerId,
>                         Thread.currentThread().getName(), partitionToMetadataMap);
>                     consumer.commitSync(partitionToMetadataMap);
>                 }
>                 partitionToOffsetMap.clear();
>             }
>         }
>
>         private void sleep(long sleepMs){
>             try {
>                 Thread.sleep(sleepMs);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }
>
>         @Override
>         public void run() {
>             logger.info("Starting consumer: {}", this.consumerId);
>
>             Map<TopicPartition, Long> partitionToUncommittedOffsetMap = new HashMap<>();
>             Properties props = getConsumerProperties();
>             KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
>
>             ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
>                 @Override
>                 public void onPartitionsRevoked(Collection<TopicPartition> collection) {
>                     for (TopicPartition topicPartition : collection) {
>                         logger.info("C: {}, Thread: {}, ~~Unsubscribed to topic: {}, partition: {}, ",
>                             consumerId, Thread.currentThread().getName(), topicPartition.topic(), topicPartition.partition());
>                     }
>                     logger.info("C: {}, committing offsets when partitions were revoked", consumerId);
>                     commitOffsets(consumer, partitionToUncommittedOffsetMap);
>                 }
>
>                 @Override
>                 public void onPartitionsAssigned(Collection<TopicPartition> collection) {
>                     OffsetAndMetadata currPartionData;
>                     for (TopicPartition topicPartition : collection) {
>                         currPartionData = consumer.committed(topicPartition);
>                         logger.info("C: {}, Thread: {}, $$Subscriptions: Topic: {}, partition: {}, offset: {}",
>                             consumerId, Thread.currentThread().getName(), topicPartition.topic(), topicPartition.partition(),
>                             currPartionData.offset());
>                         consumer.seek(topicPartition, currPartionData.offset());
>                     }
>                 }
>             };
>             consumer.subscribe(Arrays.asList("test-again"), listener);
>
>             logger.info("Starting to process records for consumer: {}", this.consumerId);
>             long lastUpdateTimeMs = System.currentTimeMillis();
>
>             while(consumeMsg) {
>                 ConsumerRecords<String, String> records = consumer.poll(1000); //fetch more records to process
>                 logger.info("C: {}, Number of records received: {} ", consumerId, records.count());
>                 if (records == null || records.count() == 0){
>                     logger.info("C: {}, Found no records. Sleeping for a while", this.consumerId);
>                     sleep(500);
>                     continue;
>                 }
>                 //process each record
>                 for (ConsumerRecord<String, String> record : records) {
>                     logger.info("C: {}, Received message: Topic: {}, Partition: {}, Offset: {}, Thread: {}",
>                         this.consumerId, record.topic(), record.partition(), record.offset(),
>                         Thread.currentThread().getName());
>
>                     //track and commit offset of msg processed
>                     partitionToUncommittedOffsetMap.put(new TopicPartition(record.topic(), record.partition()), record.offset());
>                     if (partitionToUncommittedOffsetMap.size() > 0 && (System.currentTimeMillis() - lastUpdateTimeMs > 1000)){
>                         commitOffsets(consumer, partitionToUncommittedOffsetMap);
>                         lastUpdateTimeMs = System.currentTimeMillis();
>                     }
>
>                     sleep(200);
>                 }
>             }
>
>             commitOffsets(consumer, partitionToUncommittedOffsetMap);
>             consumer.close();
>             logger.info("Exiting consumer: {}", this.consumerId);
>         }
>     }
>
>
>     public static void main(String[] args) throws InterruptedException {
>
>         ExecutorService executorService = Executors.newFixedThreadPool(5);
>         executorService.submit(new ProduceMsg(1));
>         executorService.submit(new ConsumeMsg(1));  //start first consumer
>
>         Thread.sleep(3000);
>         executorService.submit(new ConsumeMsg(2));  //start second consumer after few seconds
>
>         Thread.sleep(TimeUnit.SECONDS.toMillis(180)); //let both consumer run for few minutes
>         produceMsg = false;
>         consumeMsg = false;
>
>         executorService.shutdown();
>         logger.info("Waiting for executor service shutdown");
>         while(!executorService.isTerminated()){}
>         logger.info("Exiting app");
>     }
> }
>
>