You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "zk-drizzle (via GitHub)" <gi...@apache.org> on 2023/03/30 03:24:24 UTC

[GitHub] [rocketmq] zk-drizzle opened a new pull request, #6507: [ISSUE #6336] [RIP-62] Cold Read Control

zk-drizzle opened a new pull request, #6507:
URL: https://github.com/apache/rocketmq/pull/6507

   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   <!--
   If this PR fixes a GitHub issue, please add `fixes #<XXX>` or `closes #<XXX>`. Please refer to the documentation for more information:
   https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue
   -->
   
   fix #6336 
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1550615405

   ![image](https://github.com/apache/rocketmq/assets/13689954/f9831534-5b69-41ec-941f-b056b55d51f0)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1524831339

   reopen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1187111607


##########
store/src/main/java/org/apache/rocketmq/store/util/LibC.java:
##########
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);

Review Comment:
   First of all, thank you very much for your review and reply. The MappedByteBuffer#isLoaded0 method was also considered early on to solve this problem, but the granularity of this method is a little somewhat broad (it only returns true if all pages of the detected block of data are in physical memory).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1184829418


##########
store/src/main/java/org/apache/rocketmq/store/util/LibC.java:
##########
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);

Review Comment:
   We do not need to introduce mincore with JNI but use `MappedByteBuffer#isLoaded0` instead. Please see `DefaultMessageStore#checkInMemByCommitOffset` for example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163608771


##########
broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class PIDAdaptiveColdCtrStrategy implements ColdCtrStrategy {
+    private static final int MAX_STORE_NUMS = 10;
+    private static final Double KP = 0.5, KI = 0.3, KD = 0.2;

Review Comment:
   Maybe add some explanation would be better? Because the design of PID is very intelligent, and it can deal with different scenarios by setting the values of P, I, and D. However, the setting of KP, KI, and KD values needs to follow some principles, we could add some clarification or constraints here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1192094556


##########
store/src/main/java/org/apache/rocketmq/store/util/LibC.java:
##########
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);

Review Comment:
   `MappedByteBuffer#isLoaded0` supports specifying the page you want to detect as a parameter
   
   ```java
   private native boolean isLoaded0(long address, long length, int pageCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1534508841

   Using such a complicated mechanism to reduce cold data load is unnecessary for me. We could first remap the fully filled CommitLog with the flag MADV_DONTNEED or MADV_RANDOM. If there is no improvement, let's apply this optimization again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1164935503


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -872,6 +872,10 @@ public GetMessageResult getMessage(final String group, final String topic, final
                                 continue;
                             }
 
+                            if (messageStoreConfig.isColdDataFlowControlEnable() && !selectResult.isInCache()) {

Review Comment:
   System Consumer Groups(such as TOOLS_CONSUMER_GROUP) need to be considered, to prevent them from being limited.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1491212154

   ## [Codecov](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#6507](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a13345c) into [develop](https://codecov.io/gh/apache/rocketmq/commit/24bc8c90923b762be5d1d74da6bc1b27447ebce4?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (24bc8c9) will **decrease** coverage by `0.27%`.
   > The diff coverage is `16.23%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #6507      +/-   ##
   =============================================
   - Coverage      43.16%   42.89%   -0.27%     
   - Complexity      9007     9041      +34     
   =============================================
     Files           1107     1117      +10     
     Lines          78269    79042     +773     
     Branches       10202    10313     +111     
   =============================================
   + Hits           33782    33907     +125     
   - Misses         40248    40866     +618     
   - Partials        4239     4269      +30     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rocketmq/broker/coldctr/SimpleColdCtrStrategy.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvY29sZGN0ci9TaW1wbGVDb2xkQ3RyU3RyYXRlZ3kuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ocketmq/broker/processor/AdminBrokerProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0FkbWluQnJva2VyUHJvY2Vzc29yLmphdmE=) | `23.82% <0.00%> (-1.45%)` | :arrow_down: |
   | [...g/apache/rocketmq/client/impl/MQClientAPIImpl.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9NUUNsaWVudEFQSUltcGwuamF2YQ==) | `22.55% <0.00%> (-0.64%)` | :arrow_down: |
   | [...pache/rocketmq/common/coldctr/AccAndTimeStamp.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29sZGN0ci9BY2NBbmRUaW1lU3RhbXAuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...he/rocketmq/common/constant/FIleReadaheadMode.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uc3RhbnQvRklsZVJlYWRhaGVhZE1vZGUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...rg/apache/rocketmq/common/constant/LoggerName.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vY29uc3RhbnQvTG9nZ2VyTmFtZS5qYXZh) | `0.00% <ø> (ø)` | |
   | [...apache/rocketmq/remoting/protocol/RequestCode.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL3Byb3RvY29sL1JlcXVlc3RDb2RlLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `46.61% <0.00%> (-0.12%)` | :arrow_down: |
   | [...main/java/org/apache/rocketmq/store/util/LibC.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3V0aWwvTGliQy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...apache/rocketmq/tools/admin/DefaultMQAdminExt.java](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dG9vbHMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3Rvb2xzL2FkbWluL0RlZmF1bHRNUUFkbWluRXh0LmphdmE=) | `29.28% <0.00%> (-1.01%)` | :arrow_down: |
   | ... and [16 more](https://codecov.io/gh/apache/rocketmq/pull/6507?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ... and [14 files with indirect coverage changes](https://codecov.io/gh/apache/rocketmq/pull/6507/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163610518


##########
broker/src/main/java/org/apache/rocketmq/broker/coldctr/PIDAdaptiveColdCtrStrategy.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class PIDAdaptiveColdCtrStrategy implements ColdCtrStrategy {
+    private static final int MAX_STORE_NUMS = 10;
+    private static final Double KP = 0.5, KI = 0.3, KD = 0.2;

Review Comment:
   There are many esoteric values in this code. A brief explanation of these values can help users better adjust the strategy according to their own conditions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dingshuangxi888 commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "dingshuangxi888 (via GitHub)" <gi...@apache.org>.
dingshuangxi888 commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1191984283


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, getMaxOffset())) {
+                return true;
+            }
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return;
+            }
+
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().stream().forEach(mappedFile -> {
+                    byte[] pageCacheTable = checkFileInPageCache(mappedFile);
+                    if (sampleSteps > 1) {
+                        pageCacheTable = sampling(pageCacheTable, sampleSteps);
+                    }
+                    pageCacheMap.put(mappedFile.getFileName(), pageCacheTable);
+                });
+            } catch (Exception e) {
+                log.error("scanFilesInPageCache exception", e);
+            }
+        }
+
+        private void clearExpireMappedFile() {
+            Set<String> currentFileSet = mappedFileQueue.getMappedFiles()
+                .stream().map(MappedFile::getFileName).collect(Collectors.toSet());
+            pageCacheMap.entrySet().stream().forEach(entry -> {

Review Comment:
   stream() is not need for forEach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1194891440


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, getMaxOffset())) {
+                return true;
+            }
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return;
+            }
+
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().stream().forEach(mappedFile -> {
+                    byte[] pageCacheTable = checkFileInPageCache(mappedFile);
+                    if (sampleSteps > 1) {
+                        pageCacheTable = sampling(pageCacheTable, sampleSteps);
+                    }
+                    pageCacheMap.put(mappedFile.getFileName(), pageCacheTable);
+                });
+            } catch (Exception e) {
+                log.error("scanFilesInPageCache exception", e);
+            }
+        }
+
+        private void clearExpireMappedFile() {
+            Set<String> currentFileSet = mappedFileQueue.getMappedFiles()
+                .stream().map(MappedFile::getFileName).collect(Collectors.toSet());
+            pageCacheMap.entrySet().stream().forEach(entry -> {

Review Comment:
   Thank you very much for your review,I'd be happy to take your advice.



##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, getMaxOffset())) {
+                return true;
+            }
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return;
+            }
+
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().stream().forEach(mappedFile -> {

Review Comment:
   Thank you very much for your review,I'd be happy to take your advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1550616029

   windows compatibility is ok


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin merged PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1192099476


##########
store/src/main/java/org/apache/rocketmq/store/util/LibC.java:
##########
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);

Review Comment:
   In addition, there is already an `isLoaded` method in the MappedFile interface:
   
   ```java
   /**
    * Check mapped file is loaded to memory with given position and size
    * @param position start offset of data
    * @param size data size
    * @return data is resided in memory or not
    */
   boolean isLoaded(long position, int size);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1173233571


##########
broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public interface ColdCtrStrategy {
+    Double decisionFactor();
+
+    void promote(String consumerGroup, Long currentThreshold);
+
+    void decelerate(String consumerGroup, Long currentThreshold);
+
+    void collect(Long globalAcc);

Review Comment:
   That's a very good idea~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] github-code-scanning[bot] commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1489748816

   You have successfully added a new CodeQL configuration `.github/workflows/codeql_analysis.yml:CodeQL-Build`. As part of the setup process, we have scanned this repository and found 9 existing alerts. Please check the repository [Security tab](/apache/rocketmq/security/code-scanning?query=pr%3A6507+tool%3ACodeQL+is%3Aopen) to see all alerts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhiboo commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "lizhiboo (via GitHub)" <gi...@apache.org>.
lizhiboo commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1165139952


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -1686,6 +1690,16 @@ public boolean checkInDiskByCommitOffset(long offsetPy) {
         return offsetPy >= commitLog.getMinOffset();
     }
 
+    /**
+     * The ratio val is estimated by the experiment and experience
+     * so that the result is not high accurate for different business
+     * @return
+     */
+    public boolean checkInColdAreaByCommitOffset(long offsetPy, long maxOffsetPy) {

Review Comment:
   IMO, using linux system call mincore() method to check file been cached or not will achieve higher accuracy, just like pcstat command in https://github.com/tobert/pcstat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle closed pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle closed pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control 
URL: https://github.com/apache/rocketmq/pull/6507


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163760550


##########
tools/pom.xml:
##########
@@ -60,5 +60,9 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>

Review Comment:
   Using ${project.groupId} will be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] dingshuangxi888 commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "dingshuangxi888 (via GitHub)" <gi...@apache.org>.
dingshuangxi888 commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1191984035


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, getMaxOffset())) {
+                return true;
+            }
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return;
+            }
+
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().stream().forEach(mappedFile -> {

Review Comment:
   stream() is not need for forEach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1173232725


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -872,6 +872,10 @@ public GetMessageResult getMessage(final String group, final String topic, final
                                 continue;
                             }
 
+                            if (messageStoreConfig.isColdDataFlowControlEnable() && !selectResult.isInCache()) {

Review Comment:
   yes, good idea~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163650440


##########
broker/src/main/resources/rmq.broker.logback.xml:
##########
@@ -498,6 +498,34 @@
         <appender-ref ref="RocketmqPopSiftingAppender_inner"/>
     </appender>
 
+    <appender name="RocketmqColdCtrSiftingAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
+        <discriminator>
+            <key>brokerContainerLogDir</key>
+            <defaultValue>${file.separator}</defaultValue>
+        </discriminator>
+        <sift>
+            <appender name="RocketmqPopAppender"

Review Comment:
   The appender's name should be renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163568199


##########
broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdDataPullRequestHoldService.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.longpolling.PullRequest;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.SystemClock;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+
+/**
+ * just requests are type of pull have the qualification to be put into this hold queue.
+ * if the pull request is reading cold data and that request will be cold at the first time,
+ * then the pull request will be cold in this @code pullRequestLinkedBlockingQueue,
+ * in @code coldTimeoutMillis later the pull request will be warm and marked holded
+ */
+public class ColdDataPullRequestHoldService extends ServiceThread {
+
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_COLDCTR_LOGGER_NAME);
+    public static final String NO_SUSPEND_KEY = "_noSuspend_";
+
+    private final long coldHoldTimeoutMillis = 3000;
+    private final SystemClock systemClock = new SystemClock();
+    private final BrokerController brokerController;
+    private final LinkedBlockingQueue<PullRequest> pullRequestColdHoldQueue = new LinkedBlockingQueue<>(10000);
+
+    public void suspendColdDataReadRequest(PullRequest pullRequest) {
+        if (this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+            pullRequestColdHoldQueue.offer(pullRequest);
+        }
+    }
+
+    public ColdDataPullRequestHoldService(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override
+    public String getServiceName() {
+        return ColdDataPullRequestHoldService.class.getSimpleName();
+    }
+
+    @Override
+    public void run() {
+        log.info("{} service started", this.getServiceName());
+        while (!this.isStopped()) {
+            try {
+                if (!this.brokerController.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                    this.waitForRunning(20 * 1000);

Review Comment:
   Is it possible to continue after waiting for 20 seconds here? If cold control is not enabled, can the subsequent process of processing pullRequest be skipped?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1163824553


##########
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java:
##########
@@ -1547,6 +1561,16 @@ protected void startBasicService() throws Exception {
             this.brokerPreOnlineService.start();
         }
 
+        if (this.coldDataPullRequestHoldService != null) {
+            this.coldDataPullRequestHoldService.start();
+        }
+
+        if (this.coldDataCgCtrService != null) {
+            this.coldDataCgCtrService.start();
+        }
+
+        //Init state version after messageStore initialized.
+        //this.topicConfigManager.initStateVersion();

Review Comment:
   It would be better to remove comment lines



##########
broker/src/main/resources/rmq.broker.logback.xml:
##########
@@ -595,6 +623,15 @@
         <appender-ref ref="RocketmqPopSiftingAppender"/>
     </logger>
 
+    <logger name="RocketmqColdCtr" additivity="false">
+        <level value="INFO"/>
+        <appender-ref ref="RocketmqColdCtrSiftingAppender"/>
+    </logger>
+
+    <logger name="RocketmqTraffic" additivity="false">
+        <level value="INFO"/>
+    </logger>

Review Comment:
   Why add this logger? It seems to have nothing to do with this PR



##########
tools/pom.xml:
##########
@@ -60,5 +60,9 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-store</artifactId>
+        </dependency>

Review Comment:
   IMO, tools module should not depend on store module.



##########
broker/src/main/java/org/apache/rocketmq/broker/coldctr/ColdCtrStrategy.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.coldctr;
+
+public interface ColdCtrStrategy {
+    Double decisionFactor();
+
+    void promote(String consumerGroup, Long currentThreshold);
+
+    void decelerate(String consumerGroup, Long currentThreshold);
+
+    void collect(Long globalAcc);

Review Comment:
   It would be better to add some comments on interface to explain the meaning of methods and parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1173232587


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -1686,6 +1690,16 @@ public boolean checkInDiskByCommitOffset(long offsetPy) {
         return offsetPy >= commitLog.getMinOffset();
     }
 
+    /**
+     * The ratio val is estimated by the experiment and experience
+     * so that the result is not high accurate for different business
+     * @return
+     */
+    public boolean checkInColdAreaByCommitOffset(long offsetPy, long maxOffsetPy) {

Review Comment:
   This method mainily helps us distinguish between logical cold data and not. Sometimes it can prevent normal consumers from being cold controlled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1173233294


##########
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java:
##########
@@ -1686,6 +1690,16 @@ public boolean checkInDiskByCommitOffset(long offsetPy) {
         return offsetPy >= commitLog.getMinOffset();
     }
 
+    /**
+     * The ratio val is estimated by the experiment and experience
+     * so that the result is not high accurate for different business
+     * @return
+     */
+    public boolean checkInColdAreaByCommitOffset(long offsetPy, long maxOffsetPy) {

Review Comment:
   Thank you for your review ~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1192094556


##########
store/src/main/java/org/apache/rocketmq/store/util/LibC.java:
##########
@@ -50,4 +52,8 @@ public interface LibC extends Library {
     int mlockall(int flags);
 
     int msync(Pointer p, NativeLong length, int flags);
+
+    int mincore(Pointer p, NativeLong length, byte[] vec);

Review Comment:
   isLoaded0
   
   ```java
   private native boolean isLoaded0(long address, long length, int pageCount);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] zk-drizzle commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "zk-drizzle (via GitHub)" <gi...@apache.org>.
zk-drizzle commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1489643061

   The test report of [RIP-62] Cold Read Control](https://docs.google.com/document/d/1Ppz8V_a4piXQ6fnJjBFq6V7epBACpNQfD0NJOjio-4k/edit?usp=sharing)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] GenerousMan commented on pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

Posted by "GenerousMan (via GitHub)" <gi...@apache.org>.
GenerousMan commented on PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#issuecomment-1533952739

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org