You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/02/14 05:51:03 UTC

[rocketmq] 08/18: Add try catch

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 65e3d1e4c0d7dab8fd3a6eb5bd316c2da3b43ff0
Author: nowinkey <no...@tom.com>
AuthorDate: Wed Feb 8 21:08:58 2023 +0800

    Add try catch
---
 .../apache/rocketmq/store/DefaultMessageStore.java | 28 ++++++++++++----------
 1 file changed, 16 insertions(+), 12 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 48146c638..460cb9f1e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2855,20 +2855,24 @@ public class DefaultMessageStore implements MessageStore {
                 if (!batchDispatchRequestQueue.isEmpty()) {
                     BatchDispatchRequest task = batchDispatchRequestQueue.peek();
                     batchDispatchRequestExecutor.execute(() -> {
-                        ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
-                        tmpByteBuffer.position(task.position);
-                        tmpByteBuffer.limit(task.position + task.size);
-                        List<DispatchRequest> dispatchRequestList = new ArrayList<>();
-                        while (tmpByteBuffer.hasRemaining()) {
-                            DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
-                            if (dispatchRequest.isSuccess()) {
-                                dispatchRequestList.add(dispatchRequest);
-                            } else {
-                                LOGGER.error("[BUG]read total count not equals msg total size.");
+                        try {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
                             }
+                            dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        } catch (Exception e) {
+                            LOGGER.error("There is an exception in task execution.", e);
                         }
-                        dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
-                        mappedPageHoldCount.getAndDecrement();
                     });
                     batchDispatchRequestQueue.poll();
                 }