You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/04/22 02:55:01 UTC

[GitHub] [rocketmq] saharaheart opened a new issue #2830: The visibility of message when in consumerQueue (Question)

saharaheart opened a new issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830


   Hello everyone! When reading the source code about consumerQueue, I found thet the reputMessageService relies on the commitLog.getMaxOffset() to append message to consumer queue. However, the value of  commitLog.getMaxOffset() may just change before the message actually been flushed to the disk or replicated by the slave. So will that be possible that the consumer will see the message that has not been flushed and duplicated by the slave?
   I checked that the logic of ha service and handleDiskFlush happened after changing the commitLog' maxOffset and the reputMessage service is concurrent with those logics. Did I miss something? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart edited a comment on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   
   
   ```
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile (point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  (pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826283817


   > emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   > I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.
   
   you are right。I reviewed the relevant code,It is possible that the message was consumed without flashing the disk and backing up to the standby node。But the message will still be flashed and synchronized to the standby node at the end
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825309705


   > ![image](https://user-images.githubusercontent.com/29321745/115662781-104ccd00-a372-11eb-9d24-48f47c95010c.png)
   > org.apache.rocketmq.store.CommitLog#asyncPutMessages
   > I did not find the code you said
   
   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   
   
   ```
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile (point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  (pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826779665


   So, would you think it is useful to add configuration and related code for this issue, so it could avoid the case. The Kafka provides guarantee like this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart removed a comment on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart removed a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   
   
   ```
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile (point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  (pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 edited a comment on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
panzhi33 edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-824559815


   ![image](https://user-images.githubusercontent.com/29321745/115662781-104ccd00-a372-11eb-9d24-48f47c95010c.png)
   org.apache.rocketmq.store.CommitLog#asyncPutMessages  
   I did not find the code you said


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] vongosling closed issue #2830: (Discussion for enhancement)Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
vongosling closed issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: (Discussion for enhancement)Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826876711


   > So, would you think it is useful to add configuration and related code for this issue, so it could avoid the case. The Kafka provides guarantee like this.
   
   I think you can try


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 edited a comment on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826283817


   > emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   > I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.
   
   you are right,sorry。I reviewed the relevant code,It is possible that the message was consumed without flashing the disk and backing up to the standby node。But the message will still be flashed and synchronized to the standby node at the end
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826279219


   emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826019739


   > Is there any code that indicates the SYNC_MASTER and SYNC_FLUSH mechanism? I rechecked that the reputMessageService(org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService) relies on the commitedOffset to transfer commitlog to consumerQueue. It could be traced from ReputMessageService#doReput(). The SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); in doReput() finally use commitedOffset as the bound to do the reput. However, The commitedOffset is updated by the CommitRealTimeService (org.apache.rocketmq.store.CommitLog.CommitRealTimeService). This service will just update the commitedOffset periodly, which does not consider if the data is actually flushed to the disk. The flush operation is actually happened in another thread. There is no syncronization between them. So in this case, even in the SYNC_MASTER and SYNC_FLUSH settings, there is no gurantee to make the reputService waitfor the disk flush and slave replication.
   > I wonder if I miss something or this is what actually what rocketmq is designed.
   
   update broker config。you can see the configuration of the broker。
   they don`t wait,because only these two places are ok, the message is send successfully。You should look at the entire process of sending messages, not just this place
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 edited a comment on issue #2830: (Discussion for enhancement)Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826876711


   > So, would you think it is useful to add configuration and related code for this issue, so it could avoid the case. The Kafka provides guarantee like this.
   
   I think you can try, and I'll learn it in the meantime


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826283817


   > emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   > I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.
   
   you are right。I reviewed the relevant code,It is possible that the message was consumed without flashing the disk and backing up to the standby node。But the message will still be flashed and synchronized to the standby node at the end
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   `public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           .............
         //The following do the appendmessage operation with the mappedfile (point A)
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
          //Here it actually do the disk flush and ha handling and waiting  (pointB)
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-824559815


   ![image](https://user-images.githubusercontent.com/29321745/115662781-104ccd00-a372-11eb-9d24-48f47c95010c.png)
   org.apache.rocketmq.store.CommitLog#asyncPutMessages  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826279219


   emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart edited a comment on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825308210


   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput operation in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been push to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   
   
   `public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile (point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  (pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart edited a comment on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825309705


   > ![image](https://user-images.githubusercontent.com/29321745/115662781-104ccd00-a372-11eb-9d24-48f47c95010c.png)
   > org.apache.rocketmq.store.CommitLog#asyncPutMessages
   > I did not find the code you said
   
   First is here, org/apache/rocketmq/store/CommitLog#putMessage, It could be seen that the message appending operation in point A(I annotated in the following code), and the disk flushing operation and ha waiting operation in point B.
   However, the reputMessageService (org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService), which did the reput service (get the message from commitLog and put it into consumerQueue) in another thread. I mean if this service is fast enough, some of the message appended after pointA may already been pushed to the consumerQueue, which could be seen by the consumer. However, those messages may have not flushed or replicated to slaves. If something broke there, could that be possible the message is consumed but not recored in the broker?
   
   
   ```
   public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
           //.............
          **//The following do the appendmessage operation with the mappedfile (point A)**
   
           MappedFile unlockMappedFile = null;
           MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
   
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
           try {
               long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
               this.beginTimeInLock = beginLockTimestamp;
   
               // Here settings are stored timestamp, in order to ensure an orderly
               // global
               msg.setStoreTimestamp(beginLockTimestamp);
   
               if (null == mappedFile || mappedFile.isFull()) {
                   mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
               }
               if (null == mappedFile) {
                   log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                   beginTimeInLock = 0;
                   return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
               }
   
               result = mappedFile.appendMessage(msg, this.appendMessageCallback);
               
          ..............
   
           if (elapsedTimeInLock > 500) {
               log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
           }
   
           if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
               this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
           }
   
           PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
   
           // Statistics
           storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
           storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
   
          **//Here it actually do the disk flush and ha handling and waiting  (pointB)**
           handleDiskFlush(result, putMessageResult, msg);
           handleHA(result, putMessageResult, msg);
   
           return putMessageResult;
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 commented on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
panzhi33 commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-825346734


   if the brokerRole is SYNC_MASTER and flushDiskType is SYNC_FLUSH,the situation you said will not happen.
   others,maybe something like what you said will happen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] saharaheart commented on issue #2830: The visibility of message when in consumerQueue (Question)

Posted by GitBox <gi...@apache.org>.
saharaheart commented on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826005873


   Is there any code that indicates the SYNC_MASTER and SYNC_FLUSH mechanism? I rechecked that the reputMessageService(org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService) relies on the commitedOffset to transfer commitlog to consumerQueue. It could be traced from ReputMessageService#doReput(). The SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); in doReput() finally use commitedOffset as the bound to do the reput. However, The commitedOffset is updated by the CommitRealTimeService (org.apache.rocketmq.store.CommitLog.CommitRealTimeService). This service will just update the commitedOffset periodly, which does not consider if the data is actually flushed to the disk. The flush operation is actually happened in another thread. There is no syncronization between them. So in this case, even in the SYNC_MASTER and SYNC_FLUSH settings, there is no gurantee to make the reputService waitfor the disk flush and slave replication.
   I wonder if I miss something or this is what actually what rocketmq is designed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] panzhi33 edited a comment on issue #2830: Message transfer to consumerQueue(Using reputService) should happen after the flush and slave replication operation in SYNC_MASTER and SYNC_FLUSH settings

Posted by GitBox <gi...@apache.org>.
panzhi33 edited a comment on issue #2830:
URL: https://github.com/apache/rocketmq/issues/2830#issuecomment-826283817


   > emm, what do you mean by "only these two places are ok, the message is send successfully "? the broker sends to the producer or sends to the consumer?
   > I know the ack message will be sent to producer when these two places are ready. But when the consumer pulls the message from the broker, I do not find the code that gurantee thoses two places are ready.
   
   you are right,sorry。I reviewed the relevant code,It is possible that the message was consumed without flashing the disk and backing up to the standby node。But the message will still be flashed and synchronized to the standby node at the end
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org