You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/10/25 08:54:14 UTC

[GitHub] xingzhefeng commented on issue #503: 消息已经消费成功,而且消费了1 次,但是在rocketmq-console中 NOT_CONSUME_YET

xingzhefeng commented on issue #503: 消息已经消费成功,而且消费了1 次,但是在rocketmq-console中 NOT_CONSUME_YET 
URL: https://github.com/apache/rocketmq/issues/503#issuecomment-432969449
 
 
   问题出在 域名和ip 的判断上 
   修改 这个类DefaultMQAdminExtImpl  的 
   public boolean consumed(final MessageExt msg, final String group) 
   方法 
   `public boolean consumed(final MessageExt msg, final String group)
               throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
           log.info("开始检测消息是否已消费",group,msg);
           ConsumeStats cstats = this.examineConsumeStats(group);
   
           ClusterInfo ci = this.examineBrokerClusterInfo();
   
           Iterator<Entry<MessageQueue, OffsetWrapper>> it = cstats.getOffsetTable().entrySet().iterator();
           log.info("it",it);
           while (it.hasNext()) {
               Entry<MessageQueue, OffsetWrapper> next = it.next();
               MessageQueue mq = next.getKey();
               if (mq.getTopic().equals(msg.getTopic()) && mq.getQueueId() == msg.getQueueId()) {
                   BrokerData brokerData = ci.getBrokerAddrTable().get(mq.getBrokerName());
                   log.info("服务器上的数据:",brokerData);
                   if (brokerData != null) {
                       String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                       log.info("dizhi:",addr,RemotingUtil.socketAddress2String(msg.getStoreHost()));
                       log.info("haode:",next.getValue().getConsumerOffset() , msg.getQueueOffset());
                       if (next.getValue().getConsumerOffset() > msg.getQueueOffset()) {
                           System.out.println("一个是域名一个是ip地址实际上是一样的.也返回true1");
                           return true;
                       }
   //                    if (addr.equals(RemotingUtil.socketAddress2String(msg.getStoreHost()))) {
   //                    }
                   }
               }else{
                   log.info("mq条件判断失败",mq);
               }
           }
   
           return false;
       }`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services