You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/02/17 06:00:54 UTC

[rocketmq] branch develop updated: [ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)

This is an automated email from the ASF dual-hosted git repository.

huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7428d65  [ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)
7428d65 is described below

commit 7428d6520b9829051370badd00a7c4b1e8bf1e4a
Author: 写代码的猫 <33...@users.noreply.github.com>
AuthorDate: Thu Feb 17 14:00:21 2022 +0800

    [ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)
    
    * [ISSUE #3827] Part 1. Add Slave Timeout config
    * [ISSUE #3827] Part 2. Improve Performance of Transactional Message and Delay Message Scheduling
    
    Co-authored-by: huangli <ar...@gmail.com>
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 29 ++++----
 .../apache/rocketmq/store/DefaultMessageStore.java | 23 ++++--
 .../apache/rocketmq/store/FlushDiskWatcher.java    | 78 ++++++++++++++++++++
 .../rocketmq/store/config/MessageStoreConfig.java  |  9 +++
 .../org/apache/rocketmq/store/ha/HAService.java    |  9 +--
 .../rocketmq/store/FlushDiskWatcherTest.java       | 84 ++++++++++++++++++++++
 6 files changed, 203 insertions(+), 29 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1af5c58..bf66af5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -79,6 +79,7 @@ public class CommitLog {
     private volatile Set<String> fullStorePaths = Collections.emptySet();
 
     private final MultiDispatch multiDispatch;
+    private final FlushDiskWatcher flushDiskWatcher;
 
     public CommitLog(final DefaultMessageStore defaultMessageStore) {
         String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
@@ -113,6 +114,7 @@ public class CommitLog {
 
         this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
 
+        flushDiskWatcher = new FlushDiskWatcher();
     }
 
     public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -136,6 +138,10 @@ public class CommitLog {
     public void start() {
         this.flushCommitLogService.start();
 
+        flushDiskWatcher.setDaemon(true);
+        flushDiskWatcher.start();
+
+
         if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
             this.commitLogService.start();
         }
@@ -147,6 +153,8 @@ public class CommitLog {
         }
 
         this.flushCommitLogService.shutdown();
+
+        flushDiskWatcher.shutdown(true);
     }
 
     public long flush() {
@@ -722,10 +730,6 @@ public class CommitLog {
             }
             if (replicaStatus != PutMessageStatus.PUT_OK) {
                 putMessageResult.setPutMessageStatus(replicaStatus);
-                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
-                    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
-                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
-                }
             }
             return putMessageResult;
         });
@@ -835,10 +839,6 @@ public class CommitLog {
             }
             if (replicaStatus != PutMessageStatus.PUT_OK) {
                 putMessageResult.setPutMessageStatus(replicaStatus);
-                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
-                    log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}",
-                            messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString());
-                }
             }
             return putMessageResult;
         });
@@ -852,6 +852,7 @@ public class CommitLog {
             if (messageExt.isWaitStoreMsgOK()) {
                 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                         this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                flushDiskWatcher.add(request);
                 service.putRequest(request);
                 return request.future();
             } else {
@@ -876,7 +877,7 @@ public class CommitLog {
             if (messageExt.isWaitStoreMsgOK()) {
                 if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                     GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
-                            this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+                            this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
                     service.putRequest(request);
                     service.getWaitNotifyObject().wakeupAll();
                     return request.future();
@@ -1142,19 +1143,17 @@ public class CommitLog {
     public static class GroupCommitRequest {
         private final long nextOffset;
         private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
-        private final long startTimestamp = System.currentTimeMillis();
-        private long timeoutMillis = Long.MAX_VALUE;
+        private final long deadLine;
 
         public GroupCommitRequest(long nextOffset, long timeoutMillis) {
             this.nextOffset = nextOffset;
-            this.timeoutMillis = timeoutMillis;
+            this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
         }
 
-        public GroupCommitRequest(long nextOffset) {
-            this.nextOffset = nextOffset;
+        public long getDeadLine() {
+            return deadLine;
         }
 
-
         public long getNextOffset() {
             return nextOffset;
         }
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index bfc9fd2..f11d5f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -509,18 +510,26 @@ public class DefaultMessageStore implements MessageStore {
 
     @Override
     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        try {
-            return asyncPutMessage(msg).get();
-        } catch (InterruptedException | ExecutionException e) {
-            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
-        }
+        return waitForPutResult(asyncPutMessage(msg));
     }
 
     @Override
     public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+        return waitForPutResult(asyncPutMessages(messageExtBatch));
+    }
+
+    private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) {
         try {
-            return asyncPutMessages(messageExtBatch).get();
-        } catch (InterruptedException | ExecutionException e) {
+            int putMessageTimeout =
+                    Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
+                            this.messageStoreConfig.getSlaveTimeout()) + 5000;
+            return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException | InterruptedException e) {
+            return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+        } catch (TimeoutException e) {
+            log.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and "
+                    + "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc "
+                    + "process hangs or other unexpected situations.");
             return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
         }
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java b/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java
new file mode 100644
index 0000000..980a496
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.CommitLog.GroupCommitRequest;
+
+public class FlushDiskWatcher extends ServiceThread {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();
+
+    @Override
+    public String getServiceName() {
+        return FlushDiskWatcher.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        while (!isStopped()) {
+            GroupCommitRequest request = null;
+            try {
+                request = commitRequests.take();
+            } catch (InterruptedException e) {
+                log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
+                continue;
+            }
+            while (!request.future().isDone()) {
+                long now = System.nanoTime();
+                if (now - request.getDeadLine() >= 0) {
+                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                    break;
+                }
+                // To avoid frequent thread switching, replace future.get with sleep here,
+                long sleepTime = (request.getDeadLine() - now) / 1_000_000;
+                sleepTime = Math.min(10, sleepTime);
+                if (sleepTime == 0) {
+                    request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+                    break;
+                }
+                try {
+                    Thread.sleep(sleepTime);
+                } catch (InterruptedException e) {
+                    log.warn(
+                            "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
+                    break;
+                }
+            }
+        }
+    }
+
+    public void add(GroupCommitRequest request) {
+        commitRequests.add(request);
+    }
+
+    public int queueSize() {
+        return commitRequests.size();
+    }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index bb1e01f..45293e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -131,6 +131,7 @@ public class MessageStoreConfig {
     @ImportantField
     private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
     private int syncFlushTimeout = 1000 * 5;
+    private int slaveTimeout = 3000;
     private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
     private long flushDelayOffsetInterval = 1000 * 10;
     @ImportantField
@@ -547,6 +548,14 @@ public class MessageStoreConfig {
         this.syncFlushTimeout = syncFlushTimeout;
     }
 
+    public int getSlaveTimeout() {
+        return slaveTimeout;
+    }
+
+    public void setSlaveTimeout(int slaveTimeout) {
+        this.slaveTimeout = slaveTimeout;
+    }
+
     public String getHaMasterAddress() {
         return haMasterAddress;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 845935b..c9c05b3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -287,17 +287,12 @@ public class HAService {
             if (!this.requestsRead.isEmpty()) {
                 for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                     boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                    long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
-                            + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
-                    while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
+                    long deadLine = req.getDeadLine();
+                    while (!transferOK && deadLine - System.nanoTime() > 0) {
                         this.notifyTransferObject.waitForRunning(1000);
                         transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                     }
 
-                    if (!transferOK) {
-                        log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
-                    }
-
                     req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                 }
 
diff --git a/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java
new file mode 100644
index 0000000..60f392d
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.rocketmq.store.CommitLog.GroupCommitRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlushDiskWatcherTest {
+
+    private final long timeoutMill = 5000;
+
+    @Test
+    public void testTimeout() throws Exception {
+        FlushDiskWatcher flushDiskWatcher = new FlushDiskWatcher();
+        flushDiskWatcher.setDaemon(true);
+        flushDiskWatcher.start();
+
+        int count = 100;
+        List<GroupCommitRequest> requestList = new LinkedList<>();
+        for (int i = 0; i < count; i++) {
+            GroupCommitRequest groupCommitRequest =
+                    new GroupCommitRequest(0, timeoutMill);
+            requestList.add(groupCommitRequest);
+            flushDiskWatcher.add(groupCommitRequest);
+        }
+        Thread.sleep(2 * timeoutMill);
+
+        for (GroupCommitRequest request : requestList) {
+            request.wakeupCustomer(PutMessageStatus.PUT_OK);
+        }
+
+        for (GroupCommitRequest request : requestList) {
+            Assert.assertTrue(request.future().isDone());
+            Assert.assertEquals(request.future().get(), PutMessageStatus.FLUSH_DISK_TIMEOUT);
+        }
+        Assert.assertEquals(flushDiskWatcher.queueSize(), 0);
+        flushDiskWatcher.shutdown();
+    }
+
+    @Test
+    public void testWatcher() throws Exception {
+        FlushDiskWatcher flushDiskWatcher = new FlushDiskWatcher();
+        flushDiskWatcher.setDaemon(true);
+        flushDiskWatcher.start();
+
+        int count = 100;
+        List<GroupCommitRequest> requestList = new LinkedList<>();
+        for (int i = 0; i < count; i++) {
+            GroupCommitRequest groupCommitRequest =
+                    new GroupCommitRequest(0, timeoutMill);
+            requestList.add(groupCommitRequest);
+            flushDiskWatcher.add(groupCommitRequest);
+            groupCommitRequest.wakeupCustomer(PutMessageStatus.PUT_OK);
+        }
+        Thread.sleep((timeoutMill << 20) / 1000000);
+        for (GroupCommitRequest request : requestList) {
+            Assert.assertTrue(request.future().isDone());
+            Assert.assertEquals(request.future().get(), PutMessageStatus.PUT_OK);
+        }
+        Assert.assertEquals(flushDiskWatcher.queueSize(), 0);
+        flushDiskWatcher.shutdown();
+    }
+
+
+}