You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by 邪龙 <19...@qq.com.INVALID> on 2021/10/10 03:59:39 UTC

[VOTE] RIP-22 RocketMQ Stage Message

Hi RocketMQ Community,

This is the vote for the kickoff of RIP-22 RocketMQ Stage Message.

In order to improve the performance of order-message, the concept of staged concurrency is proposed in this RIP, which enables RocketMQ to have the ability to consume order-message concurrently under certain conditions.

You can jump to GitHub:https://github.com/apache/rocketmq/issues/2937#issuecomment-855209200 to get a better reading experience.

The vote will be open for at least 72 hours or until a necessary number of votes are reached.

Please vote accordingly:

[ ] +1 approve
[ ] +0 no opinion
[ ] -1 disapprove with the reason


------------------ 原始邮件 ------------------
发件人:                                                                                                                        "炼龙"                                                                                    <1936978077@qq.com&gt;;
发送时间:&nbsp;2021年6月5日(星期六) 下午5:02
收件人:&nbsp;"dev"<dev@rocketmq.apache.org&gt;;

主题:&nbsp;RIP 22 RocketMQ Stage Message



RIP 22 RocketMQ Stage Message

Status
* Current State: Proposed
* Authors: 1936978077@qq.com
* Shepherds: 1936978077@qq.com
* Mailing List discussion: dev@rocketmq.apache.org
* Pull Request: #PR_NUMBER
* Released: 4.9.2

Background &amp; Motivation
What do we need to do
* Will we add a new module?
&nbsp; No
* Will we add new APIs?
&nbsp; Yes
* Will we add new feature?
&nbsp; Yes
Why should we do that
* What can we benefit proposed changes?
EN
&nbsp; Let me give an example to illustrate the waste of performance caused by `MessageListenerOrderly`. For example, in the e-commerce scenario, we use `MessageListenerOrderly` to consume the order MQ. The first 100 consumed MQ (the "consumed MQ" hereinafter referred to as "MQ") can get different extra rewards, while the MQ after 100 can get no extra rewards. At this point, `MessageListenerOrderly` can guarantee the order of the first 100 MQS and the order of the after 100 MQS, but it is meaningless to guarantee the order of the after 100 MQS. Therefore, for MQ after 100, we should use `MessageListenerConcurrently` to consume concurrently, so as to improve the performance.
&nbsp; Furthermore, suppose that the first 10 MQS can get an additional laptop (called "stage 1"), the first 10-30 MQS can get an additional tablet (called "stage 2"), the first 30-100 MQS can get an additional mobile phone (called "stage 3"), and the after 100 MQS are called "stage 4". Because the reward of each stage is the same, we can use `MessageListenerConcurrently` to consume concurrently in the stages (for example, 20 MQ concurrent consumptions in stage 2). We only need to ensure the order between stages (only when all MQS in stage 1 is consumed can MQS in stage 2 be consumed).
&nbsp; Hard to understand? It doesn't matter. Let's give a more specific example. Suppose that consuming one MQ takes 1s, single read queue and thread pool size of `MessageListenerConcurrently` is 20, then for this example, consuming 100 MQ using `MessageListenerOrderly` takes `1 * 100 = 100s` The time required to consume 100 MQS by using `MessageListenerConcurrently` periodically is as follows:
&nbsp;&nbsp; Stage 1 (1-10): 10 / 20 ≈ 1s
&nbsp;&nbsp; Stage 2 (11-30): 20 / 20 ≈ 1s
&nbsp;&nbsp; Stage 3 (31-100): 70 / 20 ≈ 4S
&nbsp;&nbsp; Total time: 1 + 1 + 4 = 6S
&nbsp;&nbsp; Compared with 100s, it is ten times faster!
&nbsp;&nbsp; To sum up, I propose `MessageListenerStagedConcurrently`, which can ensure the sequence of phases and concurrent consumption within phases. Compared with `MessageListenerOrderly`, its performance is greatly improved; Compared with `MessageListenerConcurrently`, it ensures the necessary order. Of course, this idea of `staged concurrency` can also be used in producer, brush disk and other places, and please free your imagination.
CN
&nbsp;&nbsp; 请让我举一个例子,来说明`MessageListenerOrderly`对性能的浪费。比如电商场景下,我们使用`MessageListenerOrderly`来消费订单MQ,前100条被消费的MQ(“被消费的MQ”以下简称“MQ”)可以获得不同的额外奖励,100条之后的MQ则无额外奖励。此时`MessageListenerOrderly`能够保证前100条MQ的顺序,也会保证100条之后的MQ的顺序,但是保证100条之后的MQ的顺序其实是无意义的。因此对于100条之后的MQ应该使用`MessageListenerConcurrently`来并发消费,以此提升性能。
&nbsp;&nbsp; 更进一步地说,假设前10条MQ可以额外获得一台笔记本电脑(称为“阶段1”),前10-30条MQ可以额外获得一台平板电脑(称为“阶段2”),前30-100条MQ可以额外获得一台手机(称为“阶段3”),100条之后的MQ称为“阶段4”。由于每个阶段的奖励都相同,因此阶段内是可以使用`MessageListenerConcurrently`来并发消费的(比如阶段2的20条MQ并发消费),我们只需要保证阶段之间顺序即可(阶段1的MQ全部消费完,阶段2的MQ才能开始消费)。
&nbsp;&nbsp; 难理解?没关系,我们再举一个更具体的例子。假设消费1条MQ需要耗费1s、单读队列、`MessageListenerConcurrently`所用线程池大小为20,那么对于这个例子来说,使用`MessageListenerOrderly`消费100条MQ需要耗时`1*100=100s`;“阶段性”地使用`MessageListenerConcurrently`消费100条MQ需要的耗时如下:
&nbsp;&nbsp; 阶段1(1-10):10 / 20 ≈ 1s
&nbsp;&nbsp; 阶段2(11-30):20 / 20 ≈ 1s
&nbsp;&nbsp; 阶段3(31-100):70 / 20 ≈ 4s
&nbsp;&nbsp; 总耗时:1 + 1 + 4 = 6s
&nbsp;&nbsp; 相较100s快了整整十几倍!
&nbsp;&nbsp; 综上,我在此提出`MessageListenerStagedConcurrently`,它能保证阶段间的顺序性,同时在阶段内并发消费。相较于`MessageListenerOrderly`,它的性能有极大提升;相较于`MessageListenerConcurrently`,它又保证了必要的顺序。当然,这种`阶段性并发`的思想,同样可用于producer、刷盘等其他地方,还请诸君自由发挥想象的空间。

Goals
&nbsp;&nbsp; EN: Greatly improve the performance of order-message.
&nbsp;&nbsp; CN: 大幅度提升顺序消息的性能。

Non-Goals
* What problem is this proposal NOT designed to solve?
&nbsp; EN: Not change the original order-message interface.
&nbsp; CN: 不改变原来顺序消息的接口及实现。
* Are there any limits of this proposal?
EN
&nbsp; 1.In order to maximize the performance of order-message, users need to define stages reasonably.
&nbsp; 2.Just like `MessageListenerOrderly`, the `MessageListenerStagedConcurrently` only ensures that each queue (partition) is orderly.
CN
&nbsp; 1.为了最大限度地提高顺序消息的性能,用户需要合理地定义阶段。
&nbsp; 2.与`MessageListenerOrderly`一样,`MessageListenerStagedConcurrently`只确保每个队列(分区)是有序的。

Changes
&nbsp;&nbsp; We need add some codes in common, client and broker component which include adding priority concurrency frameworks and consumer choices.Read below sections to get more details about the Stage Message for RocketMQ.

Interface Design/Change
```java
public interface MessageListenerStagedConcurrently extends MessageListener {

&nbsp; &nbsp; ConsumeOrderlyStatus consumeMessage(final List<MessageExt&gt; msgs,
&nbsp; &nbsp; &nbsp; &nbsp; final ConsumeStagedConcurrentlyContext context);

&nbsp; &nbsp; /**
&nbsp;&nbsp; &nbsp; * If returns empty collection, {@link MessageListenerStagedConcurrently} will degenerate into {@link
&nbsp;&nbsp; &nbsp; * MessageListenerConcurrently}; If returns a collection whose elements are all 1, {@link
&nbsp;&nbsp; &nbsp; * MessageListenerStagedConcurrently} will temporarily evolve into {@link MessageListenerOrderly};
&nbsp;&nbsp; &nbsp; */
&nbsp; &nbsp; List<Integer&gt; getStageDefinitions();

&nbsp; &nbsp; /**
&nbsp;&nbsp; &nbsp; * can be used to reset the current stage by CAS
&nbsp;&nbsp; &nbsp; */
&nbsp; &nbsp; void resetCurrentStageOffsetIfNeed(final String topic, final AtomicInteger currentStageOffset);
}
```
```java
/**
&nbsp;* Refer to {@link org.apache.rocketmq.client.consumer.store.OffsetStore}, manage the stage consumption progress.
&nbsp;*/
public interface StageOffsetStore {

&nbsp; &nbsp; void load() throws MQClientException;

&nbsp; &nbsp; void updateStageOffset(final MessageQueue mq, final int stageOffset, final boolean increaseOnly);

&nbsp; &nbsp; int readStageOffset(final MessageQueue mq, final ReadOffsetType type);

&nbsp; &nbsp; void persistAll(final Set<MessageQueue&gt; mqs);

&nbsp; &nbsp; void persist(final MessageQueue mq);

&nbsp; &nbsp; void removeStageOffset(MessageQueue mq);

&nbsp; &nbsp; Map<MessageQueue, Integer&gt; cloneStageOffsetTable(String topic);

&nbsp; &nbsp; void updateConsumeStageOffsetToBroker(MessageQueue mq, int stageOffset, boolean isOneway) throws RemotingException,
&nbsp; &nbsp; &nbsp; &nbsp; MQBrokerException, InterruptedException, MQClientException;
}
```
* Method behavior changes
&nbsp; Nothing specific.
* CLI command changes
&nbsp; Nothing specific.
* Log format or content changes
&nbsp; Nothing specific.

Compatibility, Deprecation, and Migration Plan
* Are backward and forward compatibility taken into consideration?
&nbsp; EN: Not change the original order-message interface.
&nbsp; CN: 没有改变原来顺序消息的接口及实现。
* Are there deprecated APIs?
&nbsp; Nothing specific.
* How do we do migration?
&nbsp; Nothing specific.

Implementation Outline
We will implement the proposed changes by 3 phases. 
Phase 1
&nbsp;&nbsp; Implement Stage Message feature in Consumer
Phase 2
&nbsp;&nbsp; Implement Stage Message feature in Producer(To be honest, I haven't thought about it yet. I'm looking forward to your idea, ha ha)
Phase 3
&nbsp;&nbsp; Implement Stage Message feature in Broker(Looking forward to your idea, too)

Rejected Alternatives 
* How does alternatives solve the issue you proposed?
&nbsp; Nothing specific.
* Pros and Cons of alternatives
&nbsp; Nothing specific.
* Why should we reject above alternatives