You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/04/23 00:54:43 UTC

[GitHub] [rocketmq] saharaheart commented on issue #2830: The visibility of message when in consumerQueue (Question)

saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   `public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           .............
         //The following do the appendmessage operation with the mappedfile (point A)
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
          //Here it actually do the disk flush and ha handling and waiting  (pointB)
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org