You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by Mad <10...@qq.com.INVALID> on 2021/12/24 06:27:27 UTC

[DISCUSS] RIP-27 Auto Batch In Producer

Hello RocketMQ Community:


&nbsp; &nbsp; RIP-26 has prepared to implement batchConsumeQueue, but the message needs to be packaged manually on the client side, so I want to implement an autobatch capability. So opened a RIP-27 to discuss
&nbsp; &nbsp;(RIP-26 中已准备实现 batchConsumeQueue,但是在客户端侧需要手动对消息进行打包,因此想实现一个autobatch的能力。开了一个RIP-27来讨论)
Click here for details
RIP27- Auto Batch In Producer


Status


Current State: Implementing



Authors: guyinyou



Shepherds: RongtongJin



Mailing List discussion: dev@rocketmq.apache.org



Pull Request: RIP-27



Released: no


Background &amp; Motivation

What do we need to do


Will we add a new module?


No.


Will we add new APIs?


Yes.


Will we add new feature?


Yes.

Why should we do that


Are there any problems of our current project?


Messages are sent in batches with very good throughput performance, but currently the business end needs to be packaged manually by itself, and the experience is not good enough.

---------------------中文分割线----------------------

消息批量发送有着非常好的吞吐量表现,但是目前需要业务方自行手动打包,体验不够好。




What can we benefit proposed changes?


To realize the automatic message packaging function, the business side only needs to simply call the sending interface of a single message to realize the throughput of batch sending.

---------------------中文分割线----------------------

实现一个消息自动打包的功能,业务方只需要简单调用单条消息的发送接口,即可达到批量发送时的吞吐量。

Goals


What problem is this proposal designed to solve?


Realize the autobatch function in the producer.

实现producer的消息自动打包功能。

Non-Goals


What problem is this proposal NOT designed to solve?



Are there any limits of this proposal?


Changes

Architecture

ProduceAccumulator


Instructions for open




batchMaxBytes:每个MessageAccumulation最大累积大小,超过该阈值将会打包发送

batchMaxDelayMs:每条Message在暂存区最长等待时间,超过该阈值将会打包发送







Producers with the same MQClientId shares the same ProducerAccumulator instance

---------------------中文分割线----------------------

相同 MQClientId 的 Producers 共用同一个 ProducerAccumulator







There are two MAP corresponding to two sending methods (syncSendBatchs and asyncSendBatchs). The key of Map is the smallest granularity of batch, the value is the staging area for messages.

There are two SERVICE for cooperating with two sending modes (GuardForSyncSendService and GuardForAsyncSendService), notify or send or cleanup regularly according to 'batchMaxDelayMs'.

---------------------中文分割线----------------------

每一个 ProducerAccumulator 中有两个 MAP 对应两种不同的发送模式(syncSendBatchs、asyncSendBatchs),MAP 的键为批量发送聚合时的最小粒度(例如MessageQueue),值为待聚合消息的缓冲区。

同时还有两个 SERVICE 用来协调两种不同的发送模式(GuardForSyncSendService、GuardForAsyncSendService),以 batchMaxDelayMs 为周期,唤醒线程、发送消息或者清理对象。




Map<String, MessageAccumulation&gt; 中的Key,在未指定 MessageQueue 的期望它是 Topic(避免出现大量小batch的情况),在指定了发送队列的情况下期望它是 Topic+Queue。这里也许应该抽象出一个类似于“聚合粒度”的概念



这里还需要对 MessageAccumulation 个数做些限制,额外内存开销大约为 batchMaxBytes * MessageAccumulation个数









Assign topic, waitStoreMsgOK, and createTime every time an instance of MessageAccumulation is created. When the user calls the send interface, the getOrCreateProduceAccumulator interface is called according to the message. If a new MessageAccumulation needs to be created, mqProducer will be registered.

---------------------中文分割线----------------------

每次创建 MessageAccumulation 实例时会赋值 topic、waitStoreMsgOK 和 createTime 信息。当用户调用 send 接口时,会触发 getOrCreateProduceAccumulator 接口,如果此时需要创建 MessageAccumulation 实例,则将本次发送操作上下文中的 mqProducer 注册进去。(相当于随机挑选该 instance 下的 producer,最大化利用已申请的资源,例如线程池)






When the user sends a message in sync mode, the thread calling the send will check if it is ready to send or be suspended. GuardForSyncSendService will regularly notify when it is ready to send during the batchMaxDelayMs cycle and clean up long unused MessageAccumulation.

---------------------中文分割线----------------------

当用户以同步模式发送消息时,调用发送接口的线程将会检测是否满足发送条件,不满足将会挂起。GuardForSyncSendService 会以 batchMaxDelayMs 为周期唤醒满足发送条件①的线程,并清理久未使用的GuardForSyncSendService。








When the user sends a message in async mode, it will add message and sendCallBack to the two lists of MessageAccumulation respectively, and then return directly. GuardForAsyncSendService will regularly send by mqProducer if it is ready to send during the batchMaxDelayMs cycle and clean up long unused MessageAccumulation.

---------------------中文分割线----------------------

当用户以异步模式发送消息时,待发送的消息和回调函数会添加到 MessageAccumulation 的两个 list 上,然后直接返回。GuardForAsyncSendService 会以 batchMaxDelayMs 为周期在满足发送条件①的情况下用注册进去的 mqProducer 异步发送,并清除久未使用的 MessageAccumulation。

①满足发送条件:距离 MessageAccumulation 创建时间 ≥ batchMaxDelayMs,或者 MessageAccumulation 中累积消息大小 ≥ batchMaxBytes


Interface Design/Change


Method signature changes


Nothing specific.


Method behavior changes


When calling the send interface, distinguish whether it needs to be sent in batches. What is needed will go to MessageAccumulation, and the others will be sent as usual

---------------------中文分割线----------------------

调用 send 接口时区分是否需要批量发送,需要的将会进入 MessageAccumulation ,其他的照常发送


CLI command changes


Nothing specific.


Log format or content changes


Nothing specific.

&nbsp;Compatibility, Deprecation, and Migration Plan


Are backward and forward compatibility taken into consideration?


Reuse the ability of MessageBatch, only as an enhancement.

---------------------中文分割线----------------------

复用了 MessageBatch 的能力,只作为能力增强


Are there deprecated APIs?


Nothing specific.


How do we do migration?


Upgrade on the client side.

Implementation Outline

We will implement the proposed changes by n phases.&nbsp;

Phase 1


Implement autobatch on the client-side


在客户端侧实现 autobatch






此阶段的实现对用户完全屏蔽,无感知。由于 MessageBatch 发送时,返回的 sendResult 中会将每一条消息的 msgId 用逗号拼接起来,分割后即可得到每一条消息各自的 msgId。

Phase 2


Enhance the ability of MessageBatch to support multiple partitions


增强 MessageBatch 的能力,使其支持不同分区的消息一起打包






此阶段的实现比较适用于普通业务消息的发送,能大大提高吞吐量,此时的 msgId 和 Phase1 的情况一致。


Phase 3


Increase BatchConsumerQueue so that messages sent in the same batch share one msgId


增加 BatchConsumerQueue ,使同一批发送的消息共用一个 msgId






此阶段的实现比较适用于 streaming 场景,针对于特定分区的消息发送,此时同一批发送消息拥有同一个 msgId

Rejected Alternatives&nbsp;

How does alternatives solve the issue you proposed?

Manual batch sending by users.

用户手动批量发送消息

Pros and Cons of alternatives

Pros: higher throughput.

Cons: It is not convenient to use, users need to understand the details.

优点:高吞吐

缺点:使用不够友好,用户需要了解具体细节(比如不可打包重试消息、定时消息且必须同topic和waitStoreMsgOk)

Why should we reject above alternatives

Users are still allowed to manually send messages in batches, and when the send interface is called, the incoming MessageBatch will be sent directly.

用户现在依然可以手动发送批量消息,当调用 send 接口入参为 MessageBatch 时将直接发送,不受本次功能影响

Re: [DISCUSS] RIP-27 Auto Batch In Producer

Posted by 金融通 <ji...@mails.ucas.ac.cn>.


&gt; -----原始邮件-----
&gt; 发件人: Mad &lt;1094592600@qq.com.INVALID&gt;
&gt; 发送时间: 2021-12-24 14:27:27 (星期五)
&gt; 收件人: dev <de...@rocketmq.apache.org>
&gt; 抄送: 
&gt; 主题: [DISCUSS] RIP-27 Auto Batch In Producer
&gt; 
&gt; Hello RocketMQ Community:
&gt; 
&gt; 
&gt; &nbsp; &nbsp; RIP-26 has prepared to implement batchConsumeQueue, but the message needs to be packaged manually on the client side, so I want to implement an autobatch capability. So opened a RIP-27 to discuss
&gt; &nbsp; &nbsp;(RIP-26 中已准备实现 batchConsumeQueue,但是在客户端侧需要手动对消息进行打包,因此想实现一个autobatch的能力。开了一个RIP-27来讨论)
&gt; Click here for details
&gt; RIP27- Auto Batch In Producer
&gt; 
&gt; 
&gt; Status
&gt; 
&gt; 
&gt; Current State: Implementing
&gt; 
&gt; 
&gt; 
&gt; Authors: guyinyou
&gt; 
&gt; 
&gt; 
&gt; Shepherds: RongtongJin
&gt; 
&gt; 
&gt; 
&gt; Mailing List discussion: dev@rocketmq.apache.org
&gt; 
&gt; 
&gt; 
&gt; Pull Request: RIP-27
&gt; 
&gt; 
&gt; 
&gt; Released: no
&gt; 
&gt; 
&gt; Background &amp; Motivation
&gt; 
&gt; What do we need to do
&gt; 
&gt; 
&gt; Will we add a new module?
&gt; 
&gt; 
&gt; No.
&gt; 
&gt; 
&gt; Will we add new APIs?
&gt; 
&gt; 
&gt; Yes.
&gt; 
&gt; 
&gt; Will we add new feature?
&gt; 
&gt; 
&gt; Yes.
&gt; 
&gt; Why should we do that
&gt; 
&gt; 
&gt; Are there any problems of our current project?
&gt; 
&gt; 
&gt; Messages are sent in batches with very good throughput performance, but currently the business end needs to be packaged manually by itself, and the experience is not good enough.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 消息批量发送有着非常好的吞吐量表现,但是目前需要业务方自行手动打包,体验不够好。
&gt; 
&gt; 
&gt; 
&gt; 
&gt; What can we benefit proposed changes?
&gt; 
&gt; 
&gt; To realize the automatic message packaging function, the business side only needs to simply call the sending interface of a single message to realize the throughput of batch sending.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 实现一个消息自动打包的功能,业务方只需要简单调用单条消息的发送接口,即可达到批量发送时的吞吐量。
&gt; 
&gt; Goals
&gt; 
&gt; 
&gt; What problem is this proposal designed to solve?
&gt; 
&gt; 
&gt; Realize the autobatch function in the producer.
&gt; 
&gt; 实现producer的消息自动打包功能。
&gt; 
&gt; Non-Goals
&gt; 
&gt; 
&gt; What problem is this proposal NOT designed to solve?
&gt; 
&gt; 
&gt; 
&gt; Are there any limits of this proposal?
&gt; 
&gt; 
&gt; Changes
&gt; 
&gt; Architecture
&gt; 
&gt; ProduceAccumulator
&gt; 
&gt; 
&gt; Instructions for open
&gt; 
&gt; 
&gt; 
&gt; 
&gt; batchMaxBytes:每个MessageAccumulation最大累积大小,超过该阈值将会打包发送
&gt; 
&gt; batchMaxDelayMs:每条Message在暂存区最长等待时间,超过该阈值将会打包发送
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; Producers with the same MQClientId shares the same ProducerAccumulator instance
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 相同 MQClientId 的 Producers 共用同一个 ProducerAccumulator
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; There are two MAP corresponding to two sending methods (syncSendBatchs and asyncSendBatchs). The key of Map is the smallest granularity of batch, the value is the staging area for messages.
&gt; 
&gt; There are two SERVICE for cooperating with two sending modes (GuardForSyncSendService and GuardForAsyncSendService), notify or send or cleanup regularly according to 'batchMaxDelayMs'.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 每一个 ProducerAccumulator 中有两个 MAP 对应两种不同的发送模式(syncSendBatchs、asyncSendBatchs),MAP 的键为批量发送聚合时的最小粒度(例如MessageQueue),值为待聚合消息的缓冲区。
&gt; 
&gt; 同时还有两个 SERVICE 用来协调两种不同的发送模式(GuardForSyncSendService、GuardForAsyncSendService),以 batchMaxDelayMs 为周期,唤醒线程、发送消息或者清理对象。
&gt; 
&gt; 
&gt; 
&gt; 
&gt; Map<string, messageaccumulation&gt;="" 中的key,在未指定="" messagequeue="" 的期望它是="" topic(避免出现大量小batch的情况),在指定了发送队列的情况下期望它是="" topic+queue。这里也许应该抽象出一个类似于“聚合粒度”的概念=""> 
&gt; 
&gt; 
&gt; 这里还需要对 MessageAccumulation 个数做些限制,额外内存开销大约为 batchMaxBytes * MessageAccumulation个数
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; Assign topic, waitStoreMsgOK, and createTime every time an instance of MessageAccumulation is created. When the user calls the send interface, the getOrCreateProduceAccumulator interface is called according to the message. If a new MessageAccumulation needs to be created, mqProducer will be registered.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 每次创建 MessageAccumulation 实例时会赋值 topic、waitStoreMsgOK 和 createTime 信息。当用户调用 send 接口时,会触发 getOrCreateProduceAccumulator 接口,如果此时需要创建 MessageAccumulation 实例,则将本次发送操作上下文中的 mqProducer 注册进去。(相当于随机挑选该 instance 下的 producer,最大化利用已申请的资源,例如线程池)
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; When the user sends a message in sync mode, the thread calling the send will check if it is ready to send or be suspended. GuardForSyncSendService will regularly notify when it is ready to send during the batchMaxDelayMs cycle and clean up long unused MessageAccumulation.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 当用户以同步模式发送消息时,调用发送接口的线程将会检测是否满足发送条件,不满足将会挂起。GuardForSyncSendService 会以 batchMaxDelayMs 为周期唤醒满足发送条件①的线程,并清理久未使用的GuardForSyncSendService。
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; When the user sends a message in async mode, it will add message and sendCallBack to the two lists of MessageAccumulation respectively, and then return directly. GuardForAsyncSendService will regularly send by mqProducer if it is ready to send during the batchMaxDelayMs cycle and clean up long unused MessageAccumulation.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 当用户以异步模式发送消息时,待发送的消息和回调函数会添加到 MessageAccumulation 的两个 list 上,然后直接返回。GuardForAsyncSendService 会以 batchMaxDelayMs 为周期在满足发送条件①的情况下用注册进去的 mqProducer 异步发送,并清除久未使用的 MessageAccumulation。
&gt; 
&gt; ①满足发送条件:距离 MessageAccumulation 创建时间 ≥ batchMaxDelayMs,或者 MessageAccumulation 中累积消息大小 ≥ batchMaxBytes
&gt; 
&gt; 
&gt; Interface Design/Change
&gt; 
&gt; 
&gt; Method signature changes
&gt; 
&gt; 
&gt; Nothing specific.
&gt; 
&gt; 
&gt; Method behavior changes
&gt; 
&gt; 
&gt; When calling the send interface, distinguish whether it needs to be sent in batches. What is needed will go to MessageAccumulation, and the others will be sent as usual
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 调用 send 接口时区分是否需要批量发送,需要的将会进入 MessageAccumulation ,其他的照常发送
&gt; 
&gt; 
&gt; CLI command changes
&gt; 
&gt; 
&gt; Nothing specific.
&gt; 
&gt; 
&gt; Log format or content changes
&gt; 
&gt; 
&gt; Nothing specific.
&gt; 
&gt; &nbsp;Compatibility, Deprecation, and Migration Plan
&gt; 
&gt; 
&gt; Are backward and forward compatibility taken into consideration?
&gt; 
&gt; 
&gt; Reuse the ability of MessageBatch, only as an enhancement.
&gt; 
&gt; ---------------------中文分割线----------------------
&gt; 
&gt; 复用了 MessageBatch 的能力,只作为能力增强
&gt; 
&gt; 
&gt; Are there deprecated APIs?
&gt; 
&gt; 
&gt; Nothing specific.
&gt; 
&gt; 
&gt; How do we do migration?
&gt; 
&gt; 
&gt; Upgrade on the client side.
&gt; 
&gt; Implementation Outline
&gt; 
&gt; We will implement the proposed changes by n phases.&nbsp;
&gt; 
&gt; Phase 1
&gt; 
&gt; 
&gt; Implement autobatch on the client-side
&gt; 
&gt; 
&gt; 在客户端侧实现 autobatch
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 此阶段的实现对用户完全屏蔽,无感知。由于 MessageBatch 发送时,返回的 sendResult 中会将每一条消息的 msgId 用逗号拼接起来,分割后即可得到每一条消息各自的 msgId。
&gt; 
&gt; Phase 2
&gt; 
&gt; 
&gt; Enhance the ability of MessageBatch to support multiple partitions
&gt; 
&gt; 
&gt; 增强 MessageBatch 的能力,使其支持不同分区的消息一起打包
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 此阶段的实现比较适用于普通业务消息的发送,能大大提高吞吐量,此时的 msgId 和 Phase1 的情况一致。
&gt; 
&gt; 
&gt; Phase 3
&gt; 
&gt; 
&gt; Increase BatchConsumerQueue so that messages sent in the same batch share one msgId
&gt; 
&gt; 
&gt; 增加 BatchConsumerQueue ,使同一批发送的消息共用一个 msgId
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 此阶段的实现比较适用于 streaming 场景,针对于特定分区的消息发送,此时同一批发送消息拥有同一个 msgId
&gt; 
&gt; Rejected Alternatives&nbsp;
&gt; 
&gt; How does alternatives solve the issue you proposed?
&gt; 
&gt; Manual batch sending by users.
&gt; 
&gt; 用户手动批量发送消息
&gt; 
&gt; Pros and Cons of alternatives
&gt; 
&gt; Pros: higher throughput.
&gt; 
&gt; Cons: It is not convenient to use, users need to understand the details.
&gt; 
&gt; 优点:高吞吐
&gt; 
&gt; 缺点:使用不够友好,用户需要了解具体细节(比如不可打包重试消息、定时消息且必须同topic和waitStoreMsgOk)
&gt; 
&gt; Why should we reject above alternatives
&gt; 
&gt; Users are still allowed to manually send messages in batches, and when the send interface is called, the incoming MessageBatch will be sent directly.
&gt; 
&gt; 用户现在依然可以手动发送批量消息,当调用 send 接口入参为 MessageBatch 时将直接发送,不受本次功能影响





I found that the pictures and links were formatted. You can click the link below for details

https://docs.google.com/document/d/1ydZrA-FNrjM52I4H72EM2yaNAEYpRHV5uUSmFgxhWUQ/edit?usp=sharing

</s...@rocketmq.apache.org>