You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2023/01/16 03:39:57 UTC

[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #5887: [ISSUE #5884] Concurrent check CommitLog messages

RongtongJin commented on code in PR #5887:
URL: https://github.com/apache/rocketmq/pull/5887#discussion_r1070791385


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {
+                while (true) {
+                    if (!batchDispatchRequestQueue.isEmpty()) {
+                        BatchDispatchRequest task = batchDispatchRequestQueue.poll();
+                        batchDispatchRequestExecutor.execute(() -> {
+                            ByteBuffer tmpByteBuffer = task.byteBuffer.duplicate();
+                            tmpByteBuffer.position(task.position);
+                            tmpByteBuffer.limit(task.position + task.size);
+                            List<DispatchRequest> dispatchRequestList = new ArrayList<>();
+                            while (tmpByteBuffer.hasRemaining()) {
+                                DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(tmpByteBuffer, false, false, false);
+                                if (dispatchRequest.isSuccess()) {
+                                    dispatchRequestList.add(dispatchRequest);
+                                } else {
+                                    LOGGER.error("[BUG]read total count not equals msg total size.");
+                                }
+                            }
+                            this.dispatchRequestOrderlyQueue.put(task.id, dispatchRequestList.toArray(new DispatchRequest[dispatchRequestList.size()]));
+                            mappedPageHoldCount.getAndDecrement();
+                        });
+                    } else {
+                        try {
+                            Thread.sleep(1);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+            }, "MainBatchDispatchRequestServiceThread").start();
+
+            new Thread(() -> {

Review Comment:
   It would be better to use org.apache.rocketmq.common.ServiceThread instead of java native thread.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {

Review Comment:
   How about creating a new class that extends ReputMessageService?  We can initialize different implementations according to isEnableBuildConsumeQueueConcurrently config. On the other hand, the ReputMessageService code will not change and the code readability will be better.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2689,6 +2865,97 @@ private void doReput() {
             }
         }
 
+        private void createBatchDispatchRequest(ByteBuffer byteBuffer, int position, int size) {
+            if (position < 0) {
+                return;
+            }
+            mappedPageHoldCount.getAndIncrement();
+            BatchDispatchRequest task = new BatchDispatchRequest(byteBuffer, position, size, batchId++);
+            batchDispatchRequestQueue.offer(task);
+        }
+
+        private void doReputConcurrently() {
+            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
+                LOGGER.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
+                        this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
+                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
+            }
+            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
+
+                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
+
+                if (result == null) {
+                    break;
+                }
+
+                int batchDispatchRequestStart = -1;
+                int batchDispatchRequestSize = -1;
+                try {
+                    this.reputFromOffset = result.getStartOffset();
+
+                    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+                        ByteBuffer byteBuffer = result.getByteBuffer();
+
+                        int totalSize = byteBuffer.getInt();
+                        if (reputFromOffset + totalSize > DefaultMessageStore.this.getConfirmOffset()) {
+                            doNext = false;
+                            break;
+                        }
+
+                        int magicCode = byteBuffer.getInt();
+                        switch (magicCode) {
+                            case CommitLog.MESSAGE_MAGIC_CODE:
+                                break;
+                            case CommitLog.BLANK_MAGIC_CODE:
+                                totalSize = 0;
+                                break;
+                            default:
+                                totalSize = -1;
+                                doNext = false;
+                        }
+
+                        if (totalSize > 0) {
+                            if (batchDispatchRequestStart == -1) {
+                                batchDispatchRequestStart = byteBuffer.position() - 8;
+                                batchDispatchRequestSize = 0;
+                            }
+                            batchDispatchRequestSize += totalSize;
+                            if (batchDispatchRequestSize > BATCH_SIZE) {
+                                this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                                batchDispatchRequestStart = -1;
+                                batchDispatchRequestSize = -1;
+                            }
+                            byteBuffer.position(byteBuffer.position() + totalSize - 8);
+                            this.reputFromOffset += totalSize;
+                            readSize += totalSize;
+                        } else {
+                            if (totalSize == 0) {
+                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
+                                readSize = result.getSize();
+                            }
+                            this.createBatchDispatchRequest(byteBuffer, batchDispatchRequestStart, batchDispatchRequestSize);
+                            batchDispatchRequestStart = -1;
+                            batchDispatchRequestSize = -1;
+                        }
+                    }
+                } catch (Throwable e) {
+                    throw e;

Review Comment:
   This line seems redundant.



##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2571,10 +2576,181 @@ public long getJoinTime() {
         }
     }
 
+    class BatchDispatchRequest {
+
+        private ByteBuffer byteBuffer;
+
+        private int position;
+
+        private int size;
+
+        private int id;
+
+        public BatchDispatchRequest(ByteBuffer byteBuffer, int position, int size, int id) {
+            this.byteBuffer = byteBuffer;
+            this.position = position;
+            this.size = size;
+            this.id = id;
+        }
+    }
+
+    class DispatchRequestOrderlyQueue {
+
+        DispatchRequest[][] buffer;
+
+        int ptr = 0;
+
+        AtomicInteger maxPtr = new AtomicInteger();
+
+        public DispatchRequestOrderlyQueue(int bufferNum) {
+            this.buffer = new DispatchRequest[bufferNum][];
+        }
+
+        public void put(int idx, DispatchRequest[] obj) {
+            while (ptr + this.buffer.length <= idx) {
+                synchronized (this) {
+                    try {
+                        this.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            int mod = idx % this.buffer.length;
+            this.buffer[mod] = obj;
+            maxPtr.incrementAndGet();
+        }
+
+        public DispatchRequest[] get(List<DispatchRequest[]> rets) {
+            synchronized (this) {
+                for (int i = 0; i < this.buffer.length; i++) {
+                    int mod = ptr % this.buffer.length;
+                    DispatchRequest[] ret = this.buffer[mod];
+                    if (ret == null) {
+                        this.notifyAll();
+                        return null;
+                    }
+                    rets.add(ret);
+                    this.buffer[mod] = null;
+                    ptr++;
+                }
+            }
+            return null;
+        }
+
+        public synchronized boolean isEmpty() {
+            return maxPtr.get() == ptr;
+        }
+
+    }
+
     class ReputMessageService extends ServiceThread {
 
         private volatile long reputFromOffset = 0;
 
+        private int batchId = 0;
+
+        private final AtomicInteger mappedPageHoldCount = new AtomicInteger(0);
+
+        private static final int BATCH_SIZE = 1024 * 1024 * 4;
+
+        private final ConcurrentLinkedQueue<BatchDispatchRequest> batchDispatchRequestQueue = new ConcurrentLinkedQueue<>();
+
+        private int dispatchRequestOrderlyQueueSize = 16;
+
+        private final DispatchRequestOrderlyQueue dispatchRequestOrderlyQueue = new DispatchRequestOrderlyQueue(dispatchRequestOrderlyQueueSize);
+
+        private int batchDispatchRequestThreadPoolNums = 16;
+
+        private ExecutorService batchDispatchRequestExecutor;
+
+        public ReputMessageService() {
+            if (messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+                initExecutorService();
+                startBatchDispatchRequestService();
+            }
+        }
+
+        private void initExecutorService() {
+            batchDispatchRequestExecutor = new ThreadPoolExecutor(
+                    this.batchDispatchRequestThreadPoolNums,
+                    this.batchDispatchRequestThreadPoolNums,
+                    1000 * 60,
+                    TimeUnit.MICROSECONDS,
+                    new LinkedBlockingDeque<>(),
+                    new ThreadFactoryImpl("BatchDispatchRequestServiceThread_"));
+        }
+
+        private void startBatchDispatchRequestService() {
+            new Thread(() -> {

Review Comment:
   It would be better to use org.apache.rocketmq.common.ServiceThread instead of java native thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org