You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by 飞翔的加菲猫 <kn...@foxmail.com> on 2018/10/22 03:58:24 UTC
Re: New increased partitions could not be rebalance, until stop allconsumers and start them
Thanks a lot. This config works.
Ruiping Li
------------------ 原始邮件 ------------------
发件人: "hacker win7"<ha...@gmail.com>;
发送时间: 2018年10月18日(星期四) 晚上6:29
收件人: "users"<us...@kafka.apache.org>;
主题: Re: New increased partitions could not be rebalance, until stop allconsumers and start them
You can add the config *props.put("metadata.max.age.ms
<http://metadata.max.age.ms>", 5000);* to your cases, and re-test it.
飞翔的加菲猫 <52...@qq.com> 于2018年10月18日周四 上午10:47写道:
> Sorry for bothering. I don't know whether it is a bug. Maybe something
> wrong in my test or there is explanation for it. Could any Kafka master
> help take a look? Thanks lot.
>
>
> Ruiping Li
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "526564746"<52...@qq.com>;
> 发送时间: 2018年10月12日(星期五) 下午4:42
> 收件人: "users"<us...@kafka.apache.org>;
>
> 主题: New increased partitions could not be rebalance, until stop all
> consumers and start them
>
>
>
> Hi Kafka team,
>
>
> I meet a strange thing about Kafka rebalance. If I increase partitions of
> a topic which subscribed by some java consumers(in same one group), there
> is no rebalance occur. Furthermore, if I start a new consumer (or stop one)
> to cause a rebalance, the increased partitions could not be assigned, until
> I stop all consumers and start them. Is that normal?
>
>
> Thanks,
> Ruiping Li
>
>
>
> --------------------------------------------------------------------------------
> Below is my test:
> 1. Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic
> test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000
>
> 2. Start 2 java consumers (C1, C2), subscribe test-topic
> 3. Increase 2 partitions of test-topic
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic
> test-topic --partitions 3
> WARNING: If partitions are increased for a topic that has a key, the
> partition logic or ordering of the messages will be affected
> Adding partitions succeeded!
>
> Increasing succeeded:
> $ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic
> test-topic
> Topic:test-topic PartitionCount:3 ReplicationFactor:1 Configs:
> retention.ms=604800000
> Topic: test-topic Partition: 0 Leader: 0 Replicas: 0 Isr:
> 0
> Topic: test-topic Partition: 1 Leader: 0 Replicas: 0 Isr:
> 0
> Topic: test-topic Partition: 2 Leader: 0 Replicas: 0 Isr:
> 0
>
> There is no rebalance occur in C1, C2.
> 4. Start a new consumer C3 to subscribed test-topic. Rebalance occur, but
> only partition test-topic-0 involved in reassigned, no test-topic-1 and
> test-topic-2.
> 5. I try to stop C2, C3, and test-topic-1 and test-topic-2 still not be
> assigned.
> 6. Stop all running consumers, and then start them. All test-topic-0,1,2
> assigned normally.
>
>
> Environment
> kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0
> and kafka_2.10-0.10.2.1, same result)
> zookeeper: 3.4.13
> consumer code:
> // consumer
> public class KafkaConsumerThread extends Thread {
> // consumer settings
> public static org.apache.kafka.clients.consumer.KafkaConsumer<String,
> String> createNativeConsumer(String groupName, String kafkaBootstrap) {
> Properties props = new Properties();
> props.put("bootstrap.servers", kafkaBootstrap);
> props.put("group.id", groupName);
> props.put("auto.offset.reset", "earliest");
> props.put("enable.auto.commit", true);
>
> props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
>
> props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
>
>
>
> return new KafkaConsumer<String, String>(props);
> }
>
>
> private static final Logger log =
> LoggerFactory.getLogger(KafkaConsumerThread.class);
> private boolean stop = false;
> private KafkaConsumer<String, String> consumer;
> private String topicName;
> private ConsumerRebalanceListener consumerRebalanceListener;
> private AtomicLong receivedRecordNumber = new AtomicLong(0);
>
>
> public KafkaConsumerThread(String topicName, String groupName,
> ConsumerRebalanceListener consumerRebalanceListener, String kafkaBootstrap)
> {
> this.consumer = createNativeConsumer(groupName, kafkaBootstrap);
> this.topicName = topicName;
> this.consumerRebalanceListener = consumerRebalanceListener;
> }
>
>
> @Override
> public void run() {
> log.info("Start consumer ..");
> consumer.subscribe(Collections.singleton(topicName),
> consumerRebalanceListener);
> while (!stop) {
> try {
> ConsumerRecords<String, String> records =
> consumer.poll(100);
> receivedRecordNumber.addAndGet(records.count());
> Iterator<ConsumerRecord<String, String>> iterator =
> records.iterator();
> while (iterator.hasNext()) {
> ConsumerRecord<String, String> record =
> iterator.next();
> log.info("Receive [key:{}][value:{}]", record.key(),
> record.value());
> }
> } catch (TimeoutException e) {
> log.info("no data");
> }
> }
> consumer.close();
> }
>
>
> public void stopConsumer() {
> this.stop = true;
> }
> }