You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:51:03 UTC
[rocketmq] 08/18: Add try catch
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 65e3d1e4c0d7dab8fd3a6eb5bd316c2da3b43ff0
Author: nowinkey <no...@tom.com>
AuthorDate: Wed Feb 8 21:08:58 2023 +0800
Add try catch
---
.../apache/rocketmq/store/DefaultMessageStore.java | 28 ++++++++++++----------
1 file changed, 16 insertions(+), 12 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 48146c638..460cb9f1e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2855,20 +2855,24 @@ public class DefaultMessageStore implements MessageStore {
if (!batchDispatchRequestQueue.isEmpty()) {
BatchDispatchRequest task = batchDispatchRequestQueue.peek();
batchDispatchRequestExecutor.execute(() -> {
- ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
- tmpByteBuffer.position(task.position);
- tmpByteBuffer.limit(task.position + task.size);
- List<DispatchRequest> dispatchRequestList = new ArrayList<>();
- while (tmpByteBuffer.hasRemaining()) {
- DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
- if (dispatchRequest.isSuccess()) {
- dispatchRequestList.add(dispatchRequest);
- } else {
- LOGGER.error("[BUG]read total count not equals msg total size.");
+ try {
+ ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+ tmpByteBuffer.position(task.position);
+ tmpByteBuffer.limit(task.position + task.size);
+ List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+ while (tmpByteBuffer.hasRemaining()) {
+ DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+ if (dispatchRequest.isSuccess()) {
+ dispatchRequestList.add(dispatchRequest);
+ } else {
+ LOGGER.error("[BUG]read total count not equals msg total size.");
+ }
}
+ dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+ mappedPageHoldCount.getAndDecrement();
+ } catch (Exception e) {
+ LOGGER.error("There is an exception in task execution.", e);
}
- dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
- mappedPageHoldCount.getAndDecrement();
});
batchDispatchRequestQueue.poll();
}