You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2023/01/17 03:00:04 UTC

[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

fuyou001 commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1071170469


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;
+                } finally {
+                    this.createBatchDispatchRequest(result.getByteBuffer(), batchDispatchRequestStart, batchDispatchRequestSize);
+                    boolean over = this.mappedPageHoldCount.get() == 0;
+                    while (!over) {
+                        try {
+                            Thread.sleep(1);

Review Comment:
   TimeUnit.MILLISECONDS.sleep(1) replace Thread.sleep



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);

Review Comment:
   duplicate codeļ¼Œplease refactor into a method 



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;

Review Comment:
   missing comment



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),

Review Comment:
   oom risk



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+

Review Comment:
   missing stop or shutdown logic 



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {
+                while (true) {
+                    if (!batchDispatchRequestQueue.isEmpty()) {
+                        BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                        batchDispatchRequestExecutor.execute(() -> {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        });
+                    } else {
+                        try {
+                            Thread.sleep(1);

Review Comment:
   TimeUnit.MILLISECONDS.sleep



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;

Review Comment:
   using byteBuffer.mark(),byteBuffer.reset() may be more code readability



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+

Review Comment:
   using ServiceThread notify and wait logic 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org