You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/12/04 07:43:38 UTC

[rocketmq] branch develop updated: [ISSUE #1904] Print log when flush timeout (#1903)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bb9f106  [ISSUE #1904] Print log when flush timeout (#1903)
bb9f106 is described below

commit bb9f106e31c01e7b38f0c0f130d211da5a4f31cb
Author: rushsky518 <ru...@163.com>
AuthorDate: Fri Dec 4 15:43:29 2020 +0800

    [ISSUE #1904] Print log when flush timeout (#1903)
    
    * rollback my code
    
    * avoid log when disk flush
    
    * when msg is in next file, flushOK value may be wrong
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 34 ++++++++++++----------
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d489e84..cce6481 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -661,14 +661,18 @@ public class CommitLog {
         storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
         storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
 
-        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
-        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
+        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
+        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
         return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
             if (flushStatus != PutMessageStatus.PUT_OK) {
-                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                putMessageResult.setPutMessageStatus(flushStatus);
             }
             if (replicaStatus != PutMessageStatus.PUT_OK) {
                 putMessageResult.setPutMessageStatus(replicaStatus);
+                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
+                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
+                }
             }
             return putMessageResult;
         });
@@ -762,15 +766,18 @@ public class CommitLog {
         storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
         storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
 
-        CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
-        CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
+        CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
+        CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
         return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
             if (flushStatus != PutMessageStatus.PUT_OK) {
-                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                putMessageResult.setPutMessageStatus(flushStatus);
             }
-
             if (replicaStatus != PutMessageStatus.PUT_OK) {
                 putMessageResult.setPutMessageStatus(replicaStatus);
+                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+                    log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}",
+                            messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString());
+                }
             }
             return putMessageResult;
         });
@@ -900,8 +907,7 @@ public class CommitLog {
         return putMessageResult;
     }
 
-    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
-                                                                  MessageExt messageExt) {
+    public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
         // Synchronization flush
         if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
             final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
@@ -926,8 +932,7 @@ public class CommitLog {
         }
     }
 
-    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
-                                                        MessageExt messageExt) {
+    public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
         if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
             HAService service = this.defaultMessageStore.getHaService();
             if (messageExt.isWaitStoreMsgOK()) {
@@ -1420,13 +1425,10 @@ public class CommitLog {
                     for (GroupCommitRequest req : this.requestsRead) {
                         // There may be a message in the next file, so a maximum of
                         // two times the flush
-                        boolean flushOK = false;
+                        boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                         for (int i = 0; i < 2 && !flushOK; i++) {
+                            CommitLog.this.mappedFileQueue.flush(0);
                             flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-
-                            if (!flushOK) {
-                                CommitLog.this.mappedFileQueue.flush(0);
-                            }
                         }
 
                         req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);