You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 飞翔的加菲猫 <kn...@foxmail.com> on 2018/10/22 03:58:24 UTC

Re: New increased partitions could not be rebalance, until stop allconsumers and start them

Thanks a lot. This config works.


Ruiping Li


------------------ 原始邮件 ------------------
发件人: "hacker win7"<ha...@gmail.com>;
发送时间: 2018年10月18日(星期四) 晚上6:29
收件人: "users"<us...@kafka.apache.org>;

主题: Re: New increased partitions could not be rebalance, until stop allconsumers and start them



You can add the config *props.put("metadata.max.age.ms
<http://metadata.max.age.ms>", 5000);* to your cases, and re-test it.

飞翔的加菲猫 <52...@qq.com> 于2018年10月18日周四 上午10:47写道:

> Sorry for bothering. I don't know whether it is a bug. Maybe something
> wrong in my test or there is explanation for it. Could any Kafka master
> help take a look? Thanks lot.
>
>
> Ruiping Li
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "526564746"<52...@qq.com>;
> 发送时间: 2018年10月12日(星期五) 下午4:42
> 收件人: "users"<us...@kafka.apache.org>;
>
> 主题: New increased partitions could not be rebalance, until stop all
> consumers and start them
>
>
>
>  Hi Kafka team,
>
>
> I meet a strange thing about Kafka rebalance. If I increase partitions of
> a topic which subscribed by some java consumers(in same one group), there
> is no rebalance occur. Furthermore, if I start a new consumer (or stop one)
> to cause a rebalance, the increased partitions could not be assigned, until
> I stop all consumers and start them. Is that normal?
>
>
> Thanks,
> Ruiping Li
>
>
>
> --------------------------------------------------------------------------------
> Below is my test:
> 1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic
> test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000
>
> 2. Start 2 java consumers (C1, C2), subscribe test-topic
> 3. Increase 2 partitions of test-topic
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic
> test-topic --partitions 3
> WARNING: If partitions are increased for a topic that has a key, the
> partition logic or ordering of the messages will be affected
> Adding partitions succeeded!
>
> Increasing succeeded:
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic
> test-topic
> Topic:test-topic    PartitionCount:3    ReplicationFactor:1    Configs:
> retention.ms=604800000
>     Topic: test-topic    Partition: 0    Leader: 0    Replicas: 0    Isr:
> 0
>     Topic: test-topic    Partition: 1    Leader: 0    Replicas: 0    Isr:
> 0
>     Topic: test-topic    Partition: 2    Leader: 0    Replicas: 0    Isr:
> 0
>
> There is no rebalance occur in C1, C2.
> 4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but
> only partition test-topic-0 involved in reassigned, no test-topic-1 and
> test-topic-2.
> 5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not be
> assigned.
> 6. Stop all running consumers, and then start them. All test-topic-0,1,2
> assigned normally.
>
>
> Environment
> kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0
> and kafka_2.10-0.10.2.1, same result)
> zookeeper: 3.4.13
> consumer code:
> // consumer
> public class KafkaConsumerThread extends Thread {
>     // consumer settings
>     public static org.apache.kafka.clients.consumer.KafkaConsumer<String,
> String> createNativeConsumer(String groupName, String kafkaBootstrap) {
>         Properties props = new Properties();
>         props.put("bootstrap.servers", kafkaBootstrap);
>         props.put("group.id", groupName);
>         props.put("auto.offset.reset", "earliest");
>         props.put("enable.auto.commit", true);
>
> props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
>
> props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
>
>
>         return new KafkaConsumer<String, String>(props);
>     }
>
>
>     private static final Logger log =
> LoggerFactory.getLogger(KafkaConsumerThread.class);
>     private boolean stop = false;
>     private KafkaConsumer<String, String> consumer;
>     private String topicName;
>     private ConsumerRebalanceListener consumerRebalanceListener;
>     private AtomicLong receivedRecordNumber = new AtomicLong(0);
>
>
>     public KafkaConsumerThread(String topicName, String groupName,
> ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap)
> {
>         this.consumer = createNativeConsumer(groupName, kafkaBootstrap);
>         this.topicName = topicName;
>         this.consumerRebalanceListener = consumerRebalanceListener;
>     }
>
>
>     @Override
>     public void run() {
>         log.info("Start consumer ..");
>         consumer.subscribe(Collections.singleton(topicName),
> consumerRebalanceListener);
>         while (!stop) {
>             try {
>                 ConsumerRecords<String, String> records =
> consumer.poll(100);
>                 receivedRecordNumber.addAndGet(records.count());
>                 Iterator<ConsumerRecord<String, String>> iterator =
> records.iterator();
>                 while (iterator.hasNext()) {
>                     ConsumerRecord<String, String> record =
> iterator.next();
>                     log.info("Receive [key:{}][value:{}]", record.key(),
> record.value());
>                 }
>             } catch (TimeoutException e) {
>                 log.info("no data");
>             }
>         }
>         consumer.close();
>     }
>
>
>     public void stopConsumer() {
>         this.stop = true;
>     }
> }