You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/03/22 01:29:08 UTC

[GitHub] [rocketmq] ozw999 commented on issue #4016: [schema]BROADCASTING , after rebalance lose broker offset

ozw999 commented on issue #4016:
URL: https://github.com/apache/rocketmq/issues/4016#issuecomment-1074589544


   ```
   private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                          final boolean isOrder) {
           boolean changed = false;
   
           Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
           while (it.hasNext()) {
               Entry<MessageQueue, ProcessQueue> next = it.next();
               MessageQueue mq = next.getKey();
               ProcessQueue pq = next.getValue();
   
               if (mq.getTopic().equals(topic)) {
                   if (!mqSet.contains(mq)) {
                       pq.setDropped(true);
                       if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                           it.remove();
                           changed = true;
                           log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                       }
                   }
   }
   ```
   ```
   public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
           this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
           this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
           if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
               && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
               return false;
           }
           return true;
       }
   ```
   broker断网后会触发consumer rebalance。而rebalance会调用`updateProcessQueueTableInRebalance()`方法,该方法在判断`removeUnnecessaryMessageQueue()`时,对于broadcasting而言是会删除掉被断网的那台broker queue的消费进度。然后定时线程会把最新的消费进度刷到磁盘。当断网的broker网络恢复了之后,由于磁盘获取不到之前的消费进度,而将queue当成是新的来处理。从而导致了message的丢失。
   ```
   public long computePullFromWhere(MessageQueue mq) {
           long result = -1;
           final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
           final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
           switch (consumeFromWhere) {
               case CONSUME_FROM_LAST_OFFSET: {
                   long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
                   if (lastOffset >= 0) {
                       result = lastOffset;
                   }
                   // First start,no offset
                   else if (-1 == lastOffset) {
                       if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                           result = 0L;
                       } else {
                           try {
                               result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                           } catch (MQClientException e) {
                               result = -1;
                           }
                       }
                   } else {
                       result = -1;
                   }
                   break;
               }
   }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org