You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/11/28 04:49:42 UTC
[GitHub] [rocketmq] ShadowySpirits opened a new pull request, #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
ShadowySpirits opened a new pull request, #5606:
URL: https://github.com/apache/rocketmq/pull/5606
Close #5605
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
Posted by GitBox <gi...@apache.org>.
RongtongJin commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1033168438
##########
store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java:
##########
@@ -1005,4 +1005,83 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
public MappedFileQueue getMappedFileQueue() {
return mappedFileQueue;
}
+
+ @Override
+ public long estimateMessageCount(long from, long to, MessageFilter filter) {
+ long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+ long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+ List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+ if (mappedFiles.isEmpty()) {
+ return -1;
+ }
+
+ boolean sample = false;
+ long match = 0;
+ long raw = 0;
+
+ for (MappedFile mappedFile : mappedFiles) {
+ int start = 0;
+ int len = mappedFile.getFileSize();
+ // First file segment
+ if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+ start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+ // Current mapped file covers search range completely.
+ len = (int) (physicalOffsetTo - physicalOffsetFrom);
+ } else {
+ len = mappedFile.getFileSize() - start;
+ }
+ }
+
+ // Scan partial of the last file segment
+ if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+ len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+ }
+
+ SelectMappedBufferResult slice = mappedFile.selectMappedBuffer(start, len);
+ if (null != slice) {
+ try {
+ ByteBuffer buffer = slice.getByteBuffer();
+ int current = 0;
+ while (current < len) {
+ // Skip physicalOffset and message length fields.
+ buffer.position(current + 8 + 4);
Review Comment:
It would be better to use constants instead of 4+8, like BatchConsumeQueue.MSG_TAG_OFFSET_INDEX
##########
store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java:
##########
@@ -1005,4 +1005,83 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
public MappedFileQueue getMappedFileQueue() {
return mappedFileQueue;
}
+
+ @Override
+ public long estimateMessageCount(long from, long to, MessageFilter filter) {
+ long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+ long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+ List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+ if (mappedFiles.isEmpty()) {
+ return -1;
+ }
+
+ boolean sample = false;
+ long match = 0;
+ long raw = 0;
+
+ for (MappedFile mappedFile : mappedFiles) {
+ int start = 0;
+ int len = mappedFile.getFileSize();
+ // First file segment
+ if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+ start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+ // Current mapped file covers search range completely.
+ len = (int) (physicalOffsetTo - physicalOffsetFrom);
+ } else {
+ len = mappedFile.getFileSize() - start;
+ }
+ }
+
+ // Scan partial of the last file segment
+ if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+ len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+ }
+
+ SelectMappedBufferResult slice = mappedFile.selectMappedBuffer(start, len);
+ if (null != slice) {
+ try {
+ ByteBuffer buffer = slice.getByteBuffer();
+ int current = 0;
+ while (current < len) {
+ // Skip physicalOffset and message length fields.
+ buffer.position(current + 8 + 4);
+ long tagCode = buffer.getLong();
Review Comment:
Since there may be multiple messages in one BCQ index, IMO, It would be better to consider using the batch size field to estimate a more accurate value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] fuyou001 commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
Posted by GitBox <gi...@apache.org>.
fuyou001 commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1033175044
##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -2666,4 +2666,33 @@ public SendMessageBackHook getSendMessageBackHook() {
public boolean isShutdown() {
return shutdown;
}
+
+ @Override
+ public long estimateMessageCount(String topic, int queueId, long from, long to, MessageFilter filter) {
+ if (from < 0) {
+ from = 0;
+ }
+
+ if (from >= to) {
+ return 0;
+ }
+
+ if (null == filter) {
+ return to - from;
+ }
+
+ ConsumeQueueInterface consumeQueue = findConsumeQueue(topic, queueId);
+ if (null == consumeQueue) {
+ return 0;
+ }
+
+ long minOffset = consumeQueue.getMinOffsetInQueue();
+ from = Math.max(from, minOffset);
+ if (to < minOffset) {
Review Comment:
minOffset maybe using from arguments semantics is more readable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] drpmma commented on a diff in pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
Posted by GitBox <gi...@apache.org>.
drpmma commented on code in PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#discussion_r1034253157
##########
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java:
##########
@@ -1001,4 +1000,88 @@ public void swapMap(int reserveNum, long forceSwapIntervalMs, long normalSwapInt
public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
mappedFileQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
}
+
+ @Override
+ public long estimateMessageCount(long from, long to, MessageFilter filter) {
+ long physicalOffsetFrom = from * CQ_STORE_UNIT_SIZE;
+ long physicalOffsetTo = to * CQ_STORE_UNIT_SIZE;
+ List<MappedFile> mappedFiles = mappedFileQueue.range(physicalOffsetFrom, physicalOffsetTo);
+ if (mappedFiles.isEmpty()) {
+ return -1;
+ }
+
+ boolean sample = false;
+ long match = 0;
+ long raw = 0;
+
+ for (MappedFile mappedFile : mappedFiles) {
+ int start = 0;
+ int len = mappedFile.getFileSize();
+ // First file segment
+ if (mappedFile.getFileFromOffset() <= physicalOffsetFrom) {
+ start = (int) (physicalOffsetFrom - mappedFile.getFileFromOffset());
+ if (mappedFile.getFileFromOffset() + mappedFile.getFileSize() >= physicalOffsetTo) {
+ // Current mapped file covers search range completely.
+ len = (int) (physicalOffsetTo - physicalOffsetFrom);
+ } else {
+ len = mappedFile.getFileSize() - start;
+ }
+ }
+
+ // Scan partial of the last file segment
+ if (0 == start && mappedFile.getFileFromOffset() + mappedFile.getFileSize() > physicalOffsetTo) {
+ len = (int) (physicalOffsetTo - mappedFile.getFileFromOffset());
+ }
+
Review Comment:
Maybe more annotation for readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter commented on pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606#issuecomment-1330125783
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#5606](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6f2bc17) into [develop](https://codecov.io/gh/apache/rocketmq/commit/08dbe7ede925e578c2c9e9bda4a556937cec7c89?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08dbe7e) will **increase** coverage by `0.09%`.
> The diff coverage is `61.87%`.
> :exclamation: Current head 6f2bc17 differs from pull request most recent head c30e31b. Consider uploading reports for the commit c30e31b to get more accurate results
```diff
@@ Coverage Diff @@
## develop #5606 +/- ##
=============================================
+ Coverage 42.37% 42.47% +0.09%
- Complexity 7929 7966 +37
=============================================
Files 1022 1023 +1
Lines 71254 71443 +189
Branches 9399 9438 +39
=============================================
+ Hits 30196 30344 +148
- Misses 37230 37264 +34
- Partials 3828 3835 +7
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5606?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../java/org/apache/rocketmq/common/BrokerConfig.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vQnJva2VyQ29uZmlnLmphdmE=) | `28.01% <0.00%> (-0.14%)` | :arrow_down: |
| [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `52.76% <0.00%> (-0.66%)` | :arrow_down: |
| [...n/java/org/apache/rocketmq/store/MessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <ø> (ø)` | |
| [...test/listener/rmq/concurrent/RMQBlockListener.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC9saXN0ZW5lci9ybXEvY29uY3VycmVudC9STVFCbG9ja0xpc3RlbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
| [...rg/apache/rocketmq/test/util/MQAdminTestUtils.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW5UZXN0VXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | |
| [...apache/rocketmq/tools/admin/DefaultMQAdminExt.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2FkbWluL0RlZmF1bHRNUUFkbWluRXh0LmphdmE=) | `30.28% <0.00%> (-0.18%)` | :arrow_down: |
| [...he/rocketmq/tools/admin/DefaultMQAdminExtImpl.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2FkbWluL0RlZmF1bHRNUUFkbWluRXh0SW1wbC5qYXZh) | `25.39% <0.00%> (-0.03%)` | :arrow_down: |
| [...n/java/org/apache/rocketmq/store/ConsumeQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0NvbnN1bWVRdWV1ZS5qYXZh) | `69.50% <78.84%> (+0.94%)` | :arrow_up: |
| [...apache/rocketmq/store/queue/BatchConsumeQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3F1ZXVlL0JhdGNoQ29uc3VtZVF1ZXVlLmphdmE=) | `70.21% <85.00%> (+1.79%)` | :arrow_up: |
| [...ava/org/apache/rocketmq/store/MappedFileQueue.java](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01hcHBlZEZpbGVRdWV1ZS5qYXZh) | `62.25% <100.00%> (+1.14%)` | :arrow_up: |
| ... and [23 more](https://codecov.io/gh/apache/rocketmq/pull/5606/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lizhanhui merged pull request #5606: [ISSUE #5605]Introduce tag estimation for lag calculation
Posted by GitBox <gi...@apache.org>.
lizhanhui merged PR #5606:
URL: https://github.com/apache/rocketmq/pull/5606
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org