You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/11/28 04:49:42 UTC

[GitHub] [rocketmq] ShadowySpirits opened a new pull request, #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

ShadowySpirits opened a new pull request, #5606:
URL: https://github.com/apache/rocketmq/pull/5606

   Close #5605


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1033168438


##########
store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java:
##########
@@ -1005,4 +1005,83 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public MappedFileQueue getMappedFileQueue() {
         return mappedFileQueue;
     }
+
+    @Override
+    public long estimateMessageCount(long from, long to, MessageFilter filter) {
+        long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+        long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+        List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+        if (mappedFiles.isEmpty()) {
+            return -1;
+        }
+
+        boolean sample = false;
+        long match = 0;
+        long raw = 0;
+
+        for (MappedFile mappedFile : mappedFiles) {
+            int start = 0;
+            int len = mappedFile.getFileSize();
+            // First file segment
+            if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+                start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+                if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+                    // Current mapped file covers search range completely.
+                    len = (int) (physicalOffsetTo - physicalOffsetFrom);
+                } else {
+                    len = mappedFile.getFileSize() - start;
+                }
+            }
+
+            // Scan partial of the last file segment
+            if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+                len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+            }
+
+            SelectMappedBufferResult slice = mappedFile.selectMappedBuffer(start, len);
+            if (null != slice) {
+                try {
+                    ByteBuffer buffer = slice.getByteBuffer();
+                    int current = 0;
+                    while (current < len) {
+                        // Skip physicalOffset and message length fields.
+                        buffer.position(current + 8 + 4);

Review Comment:
   It would be better to use constants instead of 4+8, like BatchConsumeQueue.MSG_TAG_OFFSET_INDEX



##########
store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java:
##########
@@ -1005,4 +1005,83 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public MappedFileQueue getMappedFileQueue() {
         return mappedFileQueue;
     }
+
+    @Override
+    public long estimateMessageCount(long from, long to, MessageFilter filter) {
+        long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+        long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+        List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+        if (mappedFiles.isEmpty()) {
+            return -1;
+        }
+
+        boolean sample = false;
+        long match = 0;
+        long raw = 0;
+
+        for (MappedFile mappedFile : mappedFiles) {
+            int start = 0;
+            int len = mappedFile.getFileSize();
+            // First file segment
+            if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+                start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+                if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+                    // Current mapped file covers search range completely.
+                    len = (int) (physicalOffsetTo - physicalOffsetFrom);
+                } else {
+                    len = mappedFile.getFileSize() - start;
+                }
+            }
+
+            // Scan partial of the last file segment
+            if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+                len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+            }
+
+            SelectMappedBufferResult slice = mappedFile.selectMappedBuffer(start, len);
+            if (null != slice) {
+                try {
+                    ByteBuffer buffer = slice.getByteBuffer();
+                    int current = 0;
+                    while (current < len) {
+                        // Skip physicalOffset and message length fields.
+                        buffer.position(current + 8 + 4);
+                        long tagCode = buffer.getLong();

Review Comment:
   Since there may be multiple messages in one BCQ index, IMO, It would be better to consider using the batch size field to estimate a more accurate value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

Posted by GitBox <gi...@apache.org>.
fuyou001 commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1033175044


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2666,4 +2666,33 @@ public SendMessageBackHook getSendMessageBackHook() {
     public boolean isShutdown() {
         return shutdown;
     }
+
+    @Override
+    public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) {
+        if (from < 0) {
+            from = 0;
+        }
+
+        if (from >= to) {
+            return 0;
+        }
+
+        if (null == filter) {
+            return to - from;
+        }
+
+        ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+        if (null == consumeQueue) {
+            return 0;
+        }
+
+        long minOffset = consumeQueue.getMinOffsetInQueue();
+        from = Math.max(from, minOffset);
+        if (to < minOffset) {

Review Comment:
   minOffset  maybe using from arguments semantics is more readable
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] drpmma commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

Posted by GitBox <gi...@apache.org>.
drpmma commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1034253157


##########
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java:
##########
@@ -1001,4 +1000,88 @@ public void swapMap(int reserveNum, long forceSwapIntervalMs, long normalSwapInt
     public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
         mappedFileQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
     }
+
+    @Override
+    public long estimateMessageCount(long from, long to, MessageFilter filter) {
+        long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+        long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+        List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+        if (mappedFiles.isEmpty()) {
+            return -1;
+        }
+
+        boolean sample = false;
+        long match = 0;
+        long raw = 0;
+
+        for (MappedFile mappedFile : mappedFiles) {
+            int start = 0;
+            int len = mappedFile.getFileSize();
+            // First file segment
+            if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+                start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+                if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+                    // Current mapped file covers search range completely.
+                    len = (int) (physicalOffsetTo - physicalOffsetFrom);
+                } else {
+                    len = mappedFile.getFileSize() - start;
+                }
+            }
+
+            // Scan partial of the last file segment
+            if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+                len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+            }
+

Review Comment:
   Maybe more annotation for readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#issuecomment-1330125783

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#5606](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6f2bc17) into [develop](https://codecov.io/gh/apache/rocketmq/commit/08dbe7ede925e578c2c9e9bda4a556937cec7c89?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08dbe7e) will **increase** coverage by `0.09%`.
   > The diff coverage is `61.87%`.
   
   > :exclamation: Current head 6f2bc17 differs from pull request most recent head c30e31b. Consider uploading reports for the commit c30e31b to get more accurate results
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #5606      +/-   ##
   =============================================
   + Coverage      42.37%   42.47%   +0.09%     
   - Complexity      7929     7966      +37     
   =============================================
     Files           1022     1023       +1     
     Lines          71254    71443     +189     
     Branches        9399     9438      +39     
   =============================================
   + Hits           30196    30344     +148     
   - Misses         37230    37264      +34     
   - Partials        3828     3835       +7     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/rocketmq/common/BrokerConfig.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vQnJva2VyQ29uZmlnLmphdmE=) | `28.01% <0.00%> (-0.14%)` | :arrow_down: |
   | [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `52.76% <0.00%> (-0.66%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/MessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <ø> (ø)` | |
   | [...test/listener/rmq/concurrent/RMQBlockListener.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC9saXN0ZW5lci9ybXEvY29uY3VycmVudC9STVFCbG9ja0xpc3RlbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/rocketmq/test/util/MQAdminTestUtils.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW5UZXN0VXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...apache/rocketmq/tools/admin/DefaultMQAdminExt.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2FkbWluL0RlZmF1bHRNUUFkbWluRXh0LmphdmE=) | `30.28% <0.00%> (-0.18%)` | :arrow_down: |
   | [...he/rocketmq/tools/admin/DefaultMQAdminExtImpl.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2FkbWluL0RlZmF1bHRNUUFkbWluRXh0SW1wbC5qYXZh) | `25.39% <0.00%> (-0.03%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/ConsumeQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0NvbnN1bWVRdWV1ZS5qYXZh) | `69.50% <78.84%> (+0.94%)` | :arrow_up: |
   | [...apache/rocketmq/store/queue/BatchConsumeQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL0JhdGNoQ29uc3VtZVF1ZXVlLmphdmE=) | `70.21% <85.00%> (+1.79%)` | :arrow_up: |
   | [...ava/org/apache/rocketmq/store/MappedFileQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01hcHBlZEZpbGVRdWV1ZS5qYXZh) | `62.25% <100.00%> (+1.14%)` | :arrow_up: |
   | ... and [23 more](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhanhui merged pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation

Posted by GitBox <gi...@apache.org>.
lizhanhui merged PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org