You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "dingshuangxi888 (via GitHub)" <gi...@apache.org> on 2023/05/12 07:01:02 UTC

[GitHub] [rocketmq] dingshuangxi888 commented on a diff in pull request #6507: [ISSUE #6336] [RIP-62] Cold Read Control

dingshuangxi888 commented on code in PR #6507:
URL: https://github.com/apache/rocketmq/pull/6507#discussion_r1191984035


##########
store/src/main/java/org/apache/rocketmq/store/CommitLog.java:
##########
@@ -1966,4 +2000,196 @@ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
     public FlushManager getFlushManager() {
         return flushManager;
     }
+
+    public class ColdDataCheckService extends ServiceThread {
+        private final SystemClock systemClock = new SystemClock();
+        private final ConcurrentHashMap<String, byte[]> pageCacheMap = new ConcurrentHashMap<>();
+        private int pageSize = -1;
+        private int sampleSteps = 32;
+
+        public ColdDataCheckService() {
+            sampleSteps = defaultMessageStore.getMessageStoreConfig().getSampleSteps();
+            if (sampleSteps <= 0) {
+                sampleSteps = 32;
+            }
+            initPageSize();
+            scanFilesInPageCache();
+        }
+
+        @Override
+        public String getServiceName() {
+            return ColdDataCheckService.class.getSimpleName();
+        }
+
+        @Override
+        public void run() {
+            log.info("{} service started", this.getServiceName());
+            while (!this.isStopped()) {
+                try {
+                    if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                        pageCacheMap.clear();
+                        this.waitForRunning(180 * 1000);
+                        continue;
+                    } else {
+                        this.waitForRunning(defaultMessageStore.getMessageStoreConfig().getTimerColdDataCheckIntervalMs());
+                    }
+                    long beginClockTimestamp = this.systemClock.now();
+                    scanFilesInPageCache();
+                    long costTime = this.systemClock.now() - beginClockTimestamp;
+                    log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime > 30 * 1000 ? "NOTIFYME" : "OK", costTime);
+                } catch (Throwable e) {
+                    log.warn(this.getServiceName() + " service has e: {}", e);
+                }
+            }
+            log.info("{} service end", this.getServiceName());
+        }
+
+        public boolean isDataInPageCache(final long offset) {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable()) {
+                return true;
+            }
+            if (pageSize <= 0 || sampleSteps <= 0) {
+                return true;
+            }
+            if (!defaultMessageStore.checkInColdAreaByCommitOffset(offset, getMaxOffset())) {
+                return true;
+            }
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return false;
+            }
+
+            MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
+            if (null == mappedFile) {
+                return true;
+            }
+            byte[] bytes = pageCacheMap.get(mappedFile.getFileName());
+            if (null == bytes) {
+                return true;
+            }
+
+            int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
+            int realIndex = pos / pageSize / sampleSteps;
+            return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
+        }
+
+        private void scanFilesInPageCache() {
+            if (!defaultMessageStore.getMessageStoreConfig().isColdDataFlowControlEnable() || !defaultMessageStore.getMessageStoreConfig().isColdDataScanEnable()) {
+                return;
+            }
+
+            try {
+                log.info("pageCacheMap key size: {}", pageCacheMap.size());
+                clearExpireMappedFile();
+                mappedFileQueue.getMappedFiles().stream().forEach(mappedFile -> {

Review Comment:
   stream() is not need for forEach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org