You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/21 13:42:23 UTC

[GitHub] [rocketmq] ozw999 opened a new issue #4016: [schema]BROADCASTING , after rebalance lose broker offset

ozw999 opened a new issue #4016:
URL: https://github.com/apache/rocketmq/issues/4016


   # 前提
   一个3主的MQ集群(broker-a/b/c),业务和MQ在同一台。consumer使用broadcasting模式 和 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,topic的在三台上都有queue。
   # 问题
   将broker-a所在的服务器断网,此时三个consumer分别触发rebalance。`org.apache.rocketmq.client.impl.consumer.RebalanceImpl.updateProcessQueueTableInRebalance()` 方法将consumer-a中的broker-b和broker-c的offset都清除了。导致将服务器网络恢复后,在断网期间产生在broker-a和broker-b上的message未被消费。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] ozw999 commented on issue #4016: [schema]BROADCASTING , after rebalance lose broker offset

Posted by GitBox <gi...@apache.org>.
ozw999 commented on issue #4016:
URL: https://github.com/apache/rocketmq/issues/4016#issuecomment-1074589544


   ```
   private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                          final boolean isOrder) {
           boolean changed = false;
   
           Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
           while (it.hasNext()) {
               Entry<MessageQueue, ProcessQueue> next = it.next();
               MessageQueue mq = next.getKey();
               ProcessQueue pq = next.getValue();
   
               if (mq.getTopic().equals(topic)) {
                   if (!mqSet.contains(mq)) {
                       pq.setDropped(true);
                       if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                           it.remove();
                           changed = true;
                           log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                       }
                   }
   }
   ```
   ```
   public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
           this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
           this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
           if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
               && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
               return false;
           }
           return true;
       }
   ```
   broker断网后会触发consumer rebalance。而rebalance会调用`updateProcessQueueTableInRebalance()`方法,该方法在判断`removeUnnecessaryMessageQueue()`时,对于broadcasting而言是会删除掉被断网的那台broker queue的消费进度。然后定时线程会把最新的消费进度刷到磁盘。当断网的broker网络恢复了之后,由于磁盘获取不到之前的消费进度,而将queue当成是新的来处理。从而导致了message的丢失。
   ```
   public long computePullFromWhere(MessageQueue mq) {
           long result = -1;
           final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
           final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
           switch (consumeFromWhere) {
               case CONSUME_FROM_LAST_OFFSET: {
                   long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                   if (lastOffset >= 0) {
                       result = lastOffset;
                   }
                   // First start,no offset
                   else if (-1 == lastOffset) {
                       if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                           result = 0L;
                       } else {
                           try {
                               result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                           } catch (MQClientException e) {
                               result = -1;
                           }
                       }
                   } else {
                       result = -1;
                   }
                   break;
               }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #4016: [schema]BROADCASTING , after rebalance lose broker offset

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #4016:
URL: https://github.com/apache/rocketmq/issues/4016#issuecomment-1075894936


   the consume offset will not be deleted during rebalance。
   the consume offset of broadcasting is stored locally,you did not consume the message, maybe the IP changed after the network was disconnected, and the previous local consume offset was not obtained.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org