You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2022/02/17 06:00:54 UTC
[rocketmq] branch develop updated: [ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)
This is an automated email from the ASF dual-hosted git repository.
huangli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7428d65 [ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)
7428d65 is described below
commit 7428d6520b9829051370badd00a7c4b1e8bf1e4a
Author: 写代码的猫 <33...@users.noreply.github.com>
AuthorDate: Thu Feb 17 14:00:21 2022 +0800
[ISSUE #3827]Improve Performance of transactional message and schedule message (#3828)
* [ISSUE #3827] Part 1. Add Slave Timeout config
* [ISSUE #3827] Part 2. Improve Performance of Transactional Message and Delay Message Scheduling
Co-authored-by: huangli <ar...@gmail.com>
---
.../java/org/apache/rocketmq/store/CommitLog.java | 29 ++++----
.../apache/rocketmq/store/DefaultMessageStore.java | 23 ++++--
.../apache/rocketmq/store/FlushDiskWatcher.java | 78 ++++++++++++++++++++
.../rocketmq/store/config/MessageStoreConfig.java | 9 +++
.../org/apache/rocketmq/store/ha/HAService.java | 9 +--
.../rocketmq/store/FlushDiskWatcherTest.java | 84 ++++++++++++++++++++++
6 files changed, 203 insertions(+), 29 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1af5c58..bf66af5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -79,6 +79,7 @@ public class CommitLog {
private volatile Set<String> fullStorePaths = Collections.emptySet();
private final MultiDispatch multiDispatch;
+ private final FlushDiskWatcher flushDiskWatcher;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
@@ -113,6 +114,7 @@ public class CommitLog {
this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
+ flushDiskWatcher = new FlushDiskWatcher();
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@ -136,6 +138,10 @@ public class CommitLog {
public void start() {
this.flushCommitLogService.start();
+ flushDiskWatcher.setDaemon(true);
+ flushDiskWatcher.start();
+
+
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
@@ -147,6 +153,8 @@ public class CommitLog {
}
this.flushCommitLogService.shutdown();
+
+ flushDiskWatcher.shutdown(true);
}
public long flush() {
@@ -722,10 +730,6 @@ public class CommitLog {
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
- if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
- log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",
- msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
- }
}
return putMessageResult;
});
@@ -835,10 +839,6 @@ public class CommitLog {
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
- if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
- log.error("do sync transfer other node, wait return, but failed, topic: {} client address: {}",
- messageExtBatch.getTopic(), messageExtBatch.getBornHostNameString());
- }
}
return putMessageResult;
});
@@ -852,6 +852,7 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ flushDiskWatcher.add(request);
service.putRequest(request);
return request.future();
} else {
@@ -876,7 +877,7 @@ public class CommitLog {
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
return request.future();
@@ -1142,19 +1143,17 @@ public class CommitLog {
public static class GroupCommitRequest {
private final long nextOffset;
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
- private final long startTimestamp = System.currentTimeMillis();
- private long timeoutMillis = Long.MAX_VALUE;
+ private final long deadLine;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
- this.timeoutMillis = timeoutMillis;
+ this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
}
- public GroupCommitRequest(long nextOffset) {
- this.nextOffset = nextOffset;
+ public long getDeadLine() {
+ return deadLine;
}
-
public long getNextOffset() {
return nextOffset;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index bfc9fd2..f11d5f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -509,18 +510,26 @@ public class DefaultMessageStore implements MessageStore {
@Override
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- try {
- return asyncPutMessage(msg).get();
- } catch (InterruptedException | ExecutionException e) {
- return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
- }
+ return waitForPutResult(asyncPutMessage(msg));
}
@Override
public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+ return waitForPutResult(asyncPutMessages(messageExtBatch));
+ }
+
+ private PutMessageResult waitForPutResult(CompletableFuture<PutMessageResult> putMessageResultFuture) {
try {
- return asyncPutMessages(messageExtBatch).get();
- } catch (InterruptedException | ExecutionException e) {
+ int putMessageTimeout =
+ Math.max(this.messageStoreConfig.getSyncFlushTimeout(),
+ this.messageStoreConfig.getSlaveTimeout()) + 5000;
+ return putMessageResultFuture.get(putMessageTimeout, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException e) {
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+ } catch (TimeoutException e) {
+ log.error("usually it will never timeout, putMessageTimeout is much bigger than slaveTimeout and "
+ + "flushTimeout so the result can be got anyway, but in some situations timeout will happen like full gc "
+ + "process hangs or other unexpected situations.");
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java b/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java
new file mode 100644
index 0000000..980a496
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/FlushDiskWatcher.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.CommitLog.GroupCommitRequest;
+
+public class FlushDiskWatcher extends ServiceThread {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();
+
+ @Override
+ public String getServiceName() {
+ return FlushDiskWatcher.class.getSimpleName();
+ }
+
+ @Override
+ public void run() {
+ while (!isStopped()) {
+ GroupCommitRequest request = null;
+ try {
+ request = commitRequests.take();
+ } catch (InterruptedException e) {
+ log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
+ continue;
+ }
+ while (!request.future().isDone()) {
+ long now = System.nanoTime();
+ if (now - request.getDeadLine() >= 0) {
+ request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ break;
+ }
+ // To avoid frequent thread switching, replace future.get with sleep here,
+ long sleepTime = (request.getDeadLine() - now) / 1_000_000;
+ sleepTime = Math.min(10, sleepTime);
+ if (sleepTime == 0) {
+ request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ break;
+ }
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ log.warn(
+ "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
+ break;
+ }
+ }
+ }
+ }
+
+ public void add(GroupCommitRequest request) {
+ commitRequests.add(request);
+ }
+
+ public int queueSize() {
+ return commitRequests.size();
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index bb1e01f..45293e6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -131,6 +131,7 @@ public class MessageStoreConfig {
@ImportantField
private FlushDiskType flushDiskType = FlushDiskType.ASYNC_FLUSH;
private int syncFlushTimeout = 1000 * 5;
+ private int slaveTimeout = 3000;
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
private long flushDelayOffsetInterval = 1000 * 10;
@ImportantField
@@ -547,6 +548,14 @@ public class MessageStoreConfig {
this.syncFlushTimeout = syncFlushTimeout;
}
+ public int getSlaveTimeout() {
+ return slaveTimeout;
+ }
+
+ public void setSlaveTimeout(int slaveTimeout) {
+ this.slaveTimeout = slaveTimeout;
+ }
+
public String getHaMasterAddress() {
return haMasterAddress;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 845935b..c9c05b3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -287,17 +287,12 @@ public class HAService {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
- + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
- while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
+ long deadLine = req.getDeadLine();
+ while (!transferOK && deadLine - System.nanoTime() > 0) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
- if (!transferOK) {
- log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
- }
-
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java
new file mode 100644
index 0000000..60f392d
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/FlushDiskWatcherTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.rocketmq.store.CommitLog.GroupCommitRequest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlushDiskWatcherTest {
+
+ private final long timeoutMill = 5000;
+
+ @Test
+ public void testTimeout() throws Exception {
+ FlushDiskWatcher flushDiskWatcher = new FlushDiskWatcher();
+ flushDiskWatcher.setDaemon(true);
+ flushDiskWatcher.start();
+
+ int count = 100;
+ List<GroupCommitRequest> requestList = new LinkedList<>();
+ for (int i = 0; i < count; i++) {
+ GroupCommitRequest groupCommitRequest =
+ new GroupCommitRequest(0, timeoutMill);
+ requestList.add(groupCommitRequest);
+ flushDiskWatcher.add(groupCommitRequest);
+ }
+ Thread.sleep(2 * timeoutMill);
+
+ for (GroupCommitRequest request : requestList) {
+ request.wakeupCustomer(PutMessageStatus.PUT_OK);
+ }
+
+ for (GroupCommitRequest request : requestList) {
+ Assert.assertTrue(request.future().isDone());
+ Assert.assertEquals(request.future().get(), PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
+ Assert.assertEquals(flushDiskWatcher.queueSize(), 0);
+ flushDiskWatcher.shutdown();
+ }
+
+ @Test
+ public void testWatcher() throws Exception {
+ FlushDiskWatcher flushDiskWatcher = new FlushDiskWatcher();
+ flushDiskWatcher.setDaemon(true);
+ flushDiskWatcher.start();
+
+ int count = 100;
+ List<GroupCommitRequest> requestList = new LinkedList<>();
+ for (int i = 0; i < count; i++) {
+ GroupCommitRequest groupCommitRequest =
+ new GroupCommitRequest(0, timeoutMill);
+ requestList.add(groupCommitRequest);
+ flushDiskWatcher.add(groupCommitRequest);
+ groupCommitRequest.wakeupCustomer(PutMessageStatus.PUT_OK);
+ }
+ Thread.sleep((timeoutMill << 20) / 1000000);
+ for (GroupCommitRequest request : requestList) {
+ Assert.assertTrue(request.future().isDone());
+ Assert.assertEquals(request.future().get(), PutMessageStatus.PUT_OK);
+ }
+ Assert.assertEquals(flushDiskWatcher.queueSize(), 0);
+ flushDiskWatcher.shutdown();
+ }
+
+
+}