You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/12/04 07:43:38 UTC
[rocketmq] branch develop updated: [ISSUE #1904] Print log when
flush timeout (#1903)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new bb9f106 [ISSUE #1904] Print log when flush timeout (#1903)
bb9f106 is described below
commit bb9f106e31c01e7b38f0c0f130d211da5a4f31cb
Author: rushsky518 <ru...@163.com>
AuthorDate: Fri Dec 4 15:43:29 2020 +0800
[ISSUE #1904] Print log when flush timeout (#1903)
* rollback my code
* avoid log when disk flush
* when msg is in next file, flushOK value may be wrong
---
.../java/org/apache/rocketmq/store/CommitLog.java | 34 ++++++++++++----------
1 file changed, 18 insertions(+), 16 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index d489e84..cce6481 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -661,14 +661,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
- CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
- CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
+ CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
+ CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
+ if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+ log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
+ msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
+ }
}
return putMessageResult;
});
@@ -762,15 +766,18 @@ public class CommitLog {
storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes());
- CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, putMessageResult, messageExtBatch);
- CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, putMessageResult, messageExtBatch);
+ CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
+ CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
return flushOKFuture.thenCombine(replicaOKFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ putMessageResult.setPutMessageStatus(flushStatus);
}
-
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
+ if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
+ log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}",
+ messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString());
+ }
}
return putMessageResult;
});
@@ -900,8 +907,7 @@ public class CommitLog {
return putMessageResult;
}
- public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult,
- MessageExt messageExt) {
+ public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
@@ -926,8 +932,7 @@ public class CommitLog {
}
}
- public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult,
- MessageExt messageExt) {
+ public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
@@ -1420,13 +1425,10 @@ public class CommitLog {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
- boolean flushOK = false;
+ boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
for (int i = 0; i < 2 && !flushOK; i++) {
+ CommitLog.this.mappedFileQueue.flush(0);
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
-
- if (!flushOK) {
- CommitLog.this.mappedFileQueue.flush(0);
- }
}
req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);