You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/02 05:54:13 UTC

[rocketmq] branch develop updated: [ISSUE #4962] Polish the documents for consume queue and controller

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9104d0e3d [ISSUE #4962] Polish the documents for consume queue and controller
9104d0e3d is described below

commit 9104d0e3dae21c397479aaaf8188b64e5b537fc7
Author: rongtong <ji...@163.com>
AuthorDate: Fri Sep 2 13:54:06 2022 +0800

    [ISSUE #4962] Polish the documents for consume queue and controller
---
 docs/cn/controller/deploy.md | 2 +-
 docs/cn/design.md            | 5 ++---
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git a/docs/cn/controller/deploy.md b/docs/cn/controller/deploy.md
index 8b93a1861..00aa03d64 100644
--- a/docs/cn/controller/deploy.md
+++ b/docs/cn/controller/deploy.md
@@ -15,7 +15,7 @@ Controller部署有两种方式。一种是嵌入于NameServer进行部署,可
 ```
 enableControllerInNamesrv = true
 controllerDLegerGroup = group1
-controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n1-127.0.0.1:9879
+controllerDLegerPeers = n0-127.0.0.1:9877;n1-127.0.0.1:9878;n2-127.0.0.1:9879
 controllerDLegerSelfId = n0
 controllerStorePath = /home/admin/DledgerController
 enableElectUncleanMaster = false
diff --git a/docs/cn/design.md b/docs/cn/design.md
index 82e2114d0..94d06a2ae 100644
--- a/docs/cn/design.md
+++ b/docs/cn/design.md
@@ -12,7 +12,7 @@
 
 (1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
 
-(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件,根据topic检索消息是非常低效的。Consumer可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目 [...]
+(2) ConsumeQueue:消息消费索引,引入的目的主要是提高消息消费的性能。由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件,根据topic检索消息是非常低效的。Consumer可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组 [...]
 
 (3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。
 
@@ -32,6 +32,7 @@
 (2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
 
 ### 2 通信机制
+
 RocketMQ消息队列集群主要包括NameServer、Broker(Master/Slave)、Producer、Consumer4个角色,基本通讯流程如下:
 
 (1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
@@ -131,8 +132,6 @@ Producer端在发送消息的时候,会先根据Topic找到指定的TopicPubli
 
 ![](image/rocketmq_design_9.png)
 
-
-
 - 上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;
 
 - 上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;