You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/11/23 13:23:00 UTC

[GitHub] [rocketmq] chenyx7 opened a new issue #2449: Concurrently resetOffset will cause consumer status exception

chenyx7 opened a new issue #2449:
URL: https://github.com/apache/rocketmq/issues/2449


   
   **BUG REPORT**
   
   1.  invoke  the admin method resetOffsetByTimestamp more than once in 10s 
   
   2. result: the statistics of GROUP_GET_NUMS is two times or multi times  of the TOPIC_PUT_NUMS in one consumerGroup
   ![image](https://user-images.githubusercontent.com/46484030/99966659-9966d480-2dd1-11eb-8428-1e1bf0637978.png)
   ![image](https://user-images.githubusercontent.com/46484030/99966883-f5c9f400-2dd1-11eb-9d35-fdc3794aed00.png)
   
   3. although consuming repeatably is normal for rmq, I guess this is not the normal kind of repeat
   
   4. reason: the method (org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset) is not thread safe , will cause more than one PullRequest for the same queue in pullRequestQueue , and which will cause consume repeatably and continuously until the consumer restart.
   
   5. analyze: 
   
    public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
        
               consumer.suspend();
   
               ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
               for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
                   MessageQueue mq = entry.getKey();
                   if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                       ProcessQueue pq = entry.getValue();
   
                       // if thread0 come here at 5s, and thread1 come here at 10s, they will drop the same processQueue
                       pq.setDropped(true);
                       pq.clear();
                   }
               }
   
               try {
                  //both thread0 and thread1 is blocked here 
                   TimeUnit.SECONDS.sleep(10);
               } catch (InterruptedException e) {
               }
   
   
               Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
               while (iterator.hasNext()) {
                   MessageQueue mq = iterator.next();
                   Long offset = offsetTable.get(mq);
                   if (topic.equals(mq.getTopic()) && offset != null) {
                       try {
                           consumer.updateConsumeOffset(mq, offset);
                           consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
   
                         // here, the thread1 will remove the procesQueue created by thread0, however the processQueue is not droped
                         // hence , the pullRequest created by thread0 will not exit.
                           iterator.remove();
                       } catch (Exception e) {
                           log.warn("reset offset failed. group={}, {}", group, mq, e);
                       }
                   }
               }
           } finally {
               if (consumer != null) {
                   consumer.resume();
               }
           }
       }


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on issue #2449: Concurrently resetOffset will cause consumer status exception

Posted by GitBox <gi...@apache.org>.
areyouok commented on issue #2449:
URL: https://github.com/apache/rocketmq/issues/2449#issuecomment-768252919


   I got the same problem and create a patch to fix it.
   
   https://github.com/apache/rocketmq/pull/2640


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] duhenglucky closed issue #2449: Concurrently resetOffset will cause consumer status exception

Posted by GitBox <gi...@apache.org>.
duhenglucky closed issue #2449:
URL: https://github.com/apache/rocketmq/issues/2449


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org