You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@rocketmq.apache.org by 刘春龙 <63...@qq.com> on 2018/12/04 13:16:00 UTC

关于RocketMQ消息存储的几点问题

各位RocketMQ社区朋友好,

最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。

github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>

————————————————————————————————————————————————————————

下面我把内容贴出来:

1.关于锁synchronized的使用问题;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 public synchronized void putRequest(final GroupCommitRequest request) { 
rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 synchronized (this) { 

我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。


2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 Thread.sleep(10); 
GroupCommitService此处的睡眠时间是否合理?

因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。

是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。


3. 可能会造成消息丢失;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 boolean result = false; 
 for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
     result = CommitLog.this.mappedFileQueue.commit(0); 
     CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
 } 
CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。

举个简单的场景:

A 在执行appendMessages;
B 在执行appendMessages;
在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
注意,A还未执行完appendMessages;
按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
A 执行完appendMessages;
问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。

相同的,FlushRealTimeService异步刷盘也有类似问题。


4. sleep(0)的问题

rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 Thread.sleep(0); 
-XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield

但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。

我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?

5.commitLog写满时,消息的处理问题

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
 for (int i = 0; i < 2 && !flushOK; i++) { 
     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
  
     if (!flushOK) { 
         CommitLog.this.mappedFileQueue.flush(0); 
     } 
同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。

A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。

所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。

Re: 关于RocketMQ消息存储的几点问题

Posted by heng du <du...@gmail.com>.
另外,commitLog写满时,消息的处理问题 这个问题确实存在,@刘春龙,欢迎提个PR解决下呀。

heng du <du...@gmail.com> 于2018年12月20日周四 下午5:10写道:

> 确实如Zhanhui所说,此处实现了经典的read-write dual-buffer机制,因此最大的问题就在于保障read buffer
> write buffer的写入以及swap安全问题,所以在
>
> public synchronized void putRequest(final GroupCommitRequest request) {
>     synchronized (this.requestsWrite) {
>         this.requestsWrite.add(request);
>     }
>     if (hasNotified.compareAndSet(false, true)) {
>         waitPoint.countDown(); // notify
>     }
> }
>
> 中的第一个 *synchronized *是没有必要的,只要对requestWrite以及requestRead
> 两个buffer做好保护即可。另外,在下面进行swap时即使没有加*synchronized*也是可以的,
>
> synchronized (this) {
>     this.swapRequests();
> }
>
>
> 但是此处也存在问题,猜测在写这段代码都本意是想在commit线程退出时,将最后的readbuffer中的数据swap到writebuffer中,然后刷盘,避免readbuffer数据丢失,但是加
> *synchronized*
> 并不能禁止掉readbuffer的写入,所以如果commit线程的退出在sendmessage线程前面,还是有可能继续写到readbuffer中,导致没有刷盘,所以此处线程退出需要有序退出,但是也可以优化一下。
>
>
> Zhanhui Li <li...@apache.org> 于2018年12月20日周四 下午1:19写道:
>
>> 这个Thread讨论了很多话题, 囿于时间, 回复下第一个问题.
>> 帖子认为这两个synchronize都没必要.  很显然是没理解这个地方的设计和并发情况.  简单解释下, 这里实现了经典read-write
>> dual-buffer机制, 以希望达到类似"Group Commit"效果, 调用函数不用等后端IO操作完成.
>> putRequest并没有在putMessage的同步块里面, 因此本身是存在并发调用的; 事实上, 即便放到put message lock里面,
>> 考虑到OS进程调度, 也会存在和flush线程并发访问write ArrayList的情况.  另外,
>> 简单把synchronize替换成一把ReentrantLock在这里没有任何收益.
>>
>> On Wed, Dec 19, 2018 at 10:10 PM 叶茂伟 <ym...@126.com> wrote:
>>
>>> 同意金融通的看法,另外补充几点个人看法:
>>> 1.
>>> 在RocketMQ中,类似问题中提到的sleep时间、尝试次数等设置,许多地方都写死了一个值。如果细挖的话,很多值都需要仔细斟酌或根据实际场景设置。但是否有必要把所有值都提供给用户配置,这又是一个值得思考的问题。
>>> 2. 最后一个commitlog写满时,消息处理问题。我认为确实存在,在刷盘之后,要下一次循环才能更新flushOK的值,这种做法本身就存在问题。
>>>
>>> > 在 2018年12月19日,下午9:39,金融通 <ji...@mails.ucas.ac.cn> 写道:
>>> >
>>> >
>>> > hello,我也来说一下针对其中两个问题我的想法。
>>> >
>>> > 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
>>> >
>>> >
>>> 第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。
>>> >
>>> >> -----原始邮件-----
>>> >> 发件人: "Gosling Von" <fe...@gmail.com>
>>> >> 发送时间: 2018-12-19 18:00:07 (星期三)
>>> >> 收件人: users@rocketmq.apache.org
>>> >> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
>>> >> 主题: Re: 关于RocketMQ消息存储的几点问题
>>> >>
>>> >> Hi,
>>> >>
>>> >> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache
>>> Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
>>> >>
>>> >>
>>> >> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
>>> >>
>>> >> 1.
>>> 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
>>> >>
>>> >> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU
>>> slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
>>> >>
>>> >> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
>>> >>
>>> >> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
>>> >>
>>> >>
>>> >>  if (millis == 0) {
>>> >>    // When ConvertSleepToYield is on, this matches the classic VM
>>> implementation of
>>> >>    // JVM_Sleep. Critical for similar threading behaviour (Win32)
>>> >>    // It appears that in certain GUI contexts, it may be beneficial
>>> to do a short sleep
>>> >>    // for SOLARIS
>>> >>    if (ConvertSleepToYield) {
>>> >>      os::yield();
>>> >>    } else {
>>> >>      ThreadState old_state = thread->osthread()->get_state();
>>> >>      thread->osthread()->set_state(SLEEPING);
>>> >>      os::sleep(thread, MinSleepInterval, false);
>>> >>      thread->osthread()->set_state(old_state);
>>> >>    }
>>> >>  } else {
>>> >>    ThreadState old_state = thread->osthread()->get_state();
>>> >>    thread->osthread()->set_state(SLEEPING);
>>> >>    if (os::sleep(thread, millis, true) == OS_INTRPT) {
>>> >>      // An asynchronous exception (e.g., ThreadDeathException) could
>>> have been thrown on
>>> >>      // us while we were sleeping. We do not overwrite those.
>>> >>      if (!HAS_PENDING_EXCEPTION) {
>>> >>        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>>> >>        // TODO-FIXME: THROW_MSG returns which means we will not call
>>> set_state()
>>> >>        // to properly restore the thread state.  That's likely wrong.
>>> >>        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep
>>> interrupted");
>>> >>      }
>>> >>    }
>>> >>    thread->osthread()->set_state(old_state);
>>> >>  }
>>> >>  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
>>> >> JVM_END
>>> >>
>>> >>
>>> >>
>>> >> [1]
>>> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
>>> <
>>> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
>>> >
>>> >>
>>> >> Best Regards,
>>> >> Von Gosling
>>> >>
>>> >>
>>> >>> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
>>> >>>
>>> >>> 各位RocketMQ社区朋友好,
>>> >>>
>>> >>>
>>> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
>>> >>>
>>> >>> github的issue地址:https://github.com/apache/rocketmq/issues/574 <
>>> https://github.com/apache/rocketmq/issues/574>
>>> >>>
>>> >>> ————————————————————————————————————————————————————————
>>> >>>
>>> >>> 下面我把内容贴出来:
>>> >>>
>>> >>> 1.关于锁synchronized的使用问题;
>>> >>>
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069
>>> >
>>> >>> Line 1069 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> public synchronized void putRequest(final GroupCommitRequest
>>> request) {
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136
>>> >
>>> >>> Line 1136 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> synchronized (this) {
>>> >>>
>>> >>> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
>>> >>>
>>> >>>
>>> >>> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
>>> >>>
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131
>>> >
>>> >>> Line 1131 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> Thread.sleep(10);
>>> >>> GroupCommitService此处的睡眠时间是否合理?
>>> >>>
>>> >>>
>>> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
>>> >>>
>>> >>> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
>>> >>>
>>> >>>
>>> >>> 3. 可能会造成消息丢失;
>>> >>>
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947
>>> >
>>> >>> Lines 943 to 947 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> boolean result = false;
>>> >>> for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
>>> >>>     result = CommitLog.this.mappedFileQueue.commit(0);
>>> >>>     CommitLog.log.info(this.getServiceName() + " service shutdown,
>>> retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
>>> >>> }
>>> >>> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
>>> >>>
>>> >>> 举个简单的场景:
>>> >>>
>>> >>> A 在执行appendMessages;
>>> >>> B 在执行appendMessages;
>>> >>> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
>>> >>> 注意,A还未执行完appendMessages;
>>> >>> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
>>> >>> A 执行完appendMessages;
>>> >>> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
>>> >>>
>>> >>> 相同的,FlushRealTimeService异步刷盘也有类似问题。
>>> >>>
>>> >>>
>>> >>> 4. sleep(0)的问题
>>> >>>
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507
>>> >
>>> >>> Line 507 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> Thread.sleep(0);
>>> >>> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
>>> >>>
>>> >>> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说
>>> ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1
>>> 是一样的。这样的话,并不会让出线程的时间片。
>>> >>>
>>> >>> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
>>> >>>
>>> >>> 5.commitLog写满时,消息的处理问题
>>> >>>
>>> >>>
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
>>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096
>>> >
>>> >>> Lines 1091 to 1096 in 1bedba8 <
>>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>>> >
>>> >>> for (int i = 0; i < 2 && !flushOK; i++) {
>>> >>>     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>>> req.getNextOffset();
>>> >>>
>>> >>>     if (!flushOK) {
>>> >>>         CommitLog.this.mappedFileQueue.flush(0);
>>> >>>     }
>>> >>>
>>> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
>>> >>>
>>> >>> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK =
>>> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>>> req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK =
>>> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>>> req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
>>> >>>
>>> >>> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
>>> >>
>>>
>>>

Re: 关于RocketMQ消息存储的几点问题

Posted by heng du <du...@gmail.com>.
确实如Zhanhui所说,此处实现了经典的read-write dual-buffer机制,因此最大的问题就在于保障read buffer write
buffer的写入以及swap安全问题,所以在

public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

中的第一个 *synchronized *是没有必要的,只要对requestWrite以及requestRead
两个buffer做好保护即可。另外,在下面进行swap时即使没有加*synchronized*也是可以的,

synchronized (this) {
    this.swapRequests();
}

但是此处也存在问题,猜测在写这段代码都本意是想在commit线程退出时,将最后的readbuffer中的数据swap到writebuffer中,然后刷盘,避免readbuffer数据丢失,但是加
*synchronized*
并不能禁止掉readbuffer的写入,所以如果commit线程的退出在sendmessage线程前面,还是有可能继续写到readbuffer中,导致没有刷盘,所以此处线程退出需要有序退出,但是也可以优化一下。


Zhanhui Li <li...@apache.org> 于2018年12月20日周四 下午1:19写道:

> 这个Thread讨论了很多话题, 囿于时间, 回复下第一个问题.
> 帖子认为这两个synchronize都没必要.  很显然是没理解这个地方的设计和并发情况.  简单解释下, 这里实现了经典read-write
> dual-buffer机制, 以希望达到类似"Group Commit"效果, 调用函数不用等后端IO操作完成.
> putRequest并没有在putMessage的同步块里面, 因此本身是存在并发调用的; 事实上, 即便放到put message lock里面,
> 考虑到OS进程调度, 也会存在和flush线程并发访问write ArrayList的情况.  另外,
> 简单把synchronize替换成一把ReentrantLock在这里没有任何收益.
>
> On Wed, Dec 19, 2018 at 10:10 PM 叶茂伟 <ym...@126.com> wrote:
>
>> 同意金融通的看法,另外补充几点个人看法:
>> 1.
>> 在RocketMQ中,类似问题中提到的sleep时间、尝试次数等设置,许多地方都写死了一个值。如果细挖的话,很多值都需要仔细斟酌或根据实际场景设置。但是否有必要把所有值都提供给用户配置,这又是一个值得思考的问题。
>> 2. 最后一个commitlog写满时,消息处理问题。我认为确实存在,在刷盘之后,要下一次循环才能更新flushOK的值,这种做法本身就存在问题。
>>
>> > 在 2018年12月19日,下午9:39,金融通 <ji...@mails.ucas.ac.cn> 写道:
>> >
>> >
>> > hello,我也来说一下针对其中两个问题我的想法。
>> >
>> > 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
>> >
>> >
>> 第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。
>> >
>> >> -----原始邮件-----
>> >> 发件人: "Gosling Von" <fe...@gmail.com>
>> >> 发送时间: 2018-12-19 18:00:07 (星期三)
>> >> 收件人: users@rocketmq.apache.org
>> >> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
>> >> 主题: Re: 关于RocketMQ消息存储的几点问题
>> >>
>> >> Hi,
>> >>
>> >> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache
>> Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
>> >>
>> >>
>> >> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
>> >>
>> >> 1.
>> 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
>> >>
>> >> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU
>> slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
>> >>
>> >> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
>> >>
>> >> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
>> >>
>> >>
>> >>  if (millis == 0) {
>> >>    // When ConvertSleepToYield is on, this matches the classic VM
>> implementation of
>> >>    // JVM_Sleep. Critical for similar threading behaviour (Win32)
>> >>    // It appears that in certain GUI contexts, it may be beneficial to
>> do a short sleep
>> >>    // for SOLARIS
>> >>    if (ConvertSleepToYield) {
>> >>      os::yield();
>> >>    } else {
>> >>      ThreadState old_state = thread->osthread()->get_state();
>> >>      thread->osthread()->set_state(SLEEPING);
>> >>      os::sleep(thread, MinSleepInterval, false);
>> >>      thread->osthread()->set_state(old_state);
>> >>    }
>> >>  } else {
>> >>    ThreadState old_state = thread->osthread()->get_state();
>> >>    thread->osthread()->set_state(SLEEPING);
>> >>    if (os::sleep(thread, millis, true) == OS_INTRPT) {
>> >>      // An asynchronous exception (e.g., ThreadDeathException) could
>> have been thrown on
>> >>      // us while we were sleeping. We do not overwrite those.
>> >>      if (!HAS_PENDING_EXCEPTION) {
>> >>        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>> >>        // TODO-FIXME: THROW_MSG returns which means we will not call
>> set_state()
>> >>        // to properly restore the thread state.  That's likely wrong.
>> >>        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep
>> interrupted");
>> >>      }
>> >>    }
>> >>    thread->osthread()->set_state(old_state);
>> >>  }
>> >>  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
>> >> JVM_END
>> >>
>> >>
>> >>
>> >> [1]
>> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
>> <
>> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
>> >
>> >>
>> >> Best Regards,
>> >> Von Gosling
>> >>
>> >>
>> >>> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
>> >>>
>> >>> 各位RocketMQ社区朋友好,
>> >>>
>> >>>
>> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
>> >>>
>> >>> github的issue地址:https://github.com/apache/rocketmq/issues/574 <
>> https://github.com/apache/rocketmq/issues/574>
>> >>>
>> >>> ————————————————————————————————————————————————————————
>> >>>
>> >>> 下面我把内容贴出来:
>> >>>
>> >>> 1.关于锁synchronized的使用问题;
>> >>>
>> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
>> <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069
>> >
>> >>> Line 1069 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> public synchronized void putRequest(final GroupCommitRequest request)
>> {
>> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
>> <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136
>> >
>> >>> Line 1136 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> synchronized (this) {
>> >>>
>> >>> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
>> >>>
>> >>>
>> >>> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
>> >>>
>> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
>> <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131
>> >
>> >>> Line 1131 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> Thread.sleep(10);
>> >>> GroupCommitService此处的睡眠时间是否合理?
>> >>>
>> >>>
>> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
>> >>>
>> >>> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
>> >>>
>> >>>
>> >>> 3. 可能会造成消息丢失;
>> >>>
>> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
>> <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947
>> >
>> >>> Lines 943 to 947 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> boolean result = false;
>> >>> for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
>> >>>     result = CommitLog.this.mappedFileQueue.commit(0);
>> >>>     CommitLog.log.info(this.getServiceName() + " service shutdown,
>> retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
>> >>> }
>> >>> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
>> >>>
>> >>> 举个简单的场景:
>> >>>
>> >>> A 在执行appendMessages;
>> >>> B 在执行appendMessages;
>> >>> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
>> >>> 注意,A还未执行完appendMessages;
>> >>> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
>> >>> A 执行完appendMessages;
>> >>> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
>> >>>
>> >>> 相同的,FlushRealTimeService异步刷盘也有类似问题。
>> >>>
>> >>>
>> >>> 4. sleep(0)的问题
>> >>>
>> >>>
>> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507
>> >
>> >>> Line 507 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> Thread.sleep(0);
>> >>> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
>> >>>
>> >>> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说
>> ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1
>> 是一样的。这样的话,并不会让出线程的时间片。
>> >>>
>> >>> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
>> >>>
>> >>> 5.commitLog写满时,消息的处理问题
>> >>>
>> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
>> <
>> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096
>> >
>> >>> Lines 1091 to 1096 in 1bedba8 <
>> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
>> >
>> >>> for (int i = 0; i < 2 && !flushOK; i++) {
>> >>>     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>> req.getNextOffset();
>> >>>
>> >>>     if (!flushOK) {
>> >>>         CommitLog.this.mappedFileQueue.flush(0);
>> >>>     }
>> >>>
>> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
>> >>>
>> >>> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK =
>> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>> req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK =
>> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
>> req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
>> >>>
>> >>> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
>> >>
>>
>>

Re: 关于RocketMQ消息存储的几点问题

Posted by Zhanhui Li <li...@apache.org>.
这个Thread讨论了很多话题, 囿于时间, 回复下第一个问题.
帖子认为这两个synchronize都没必要.  很显然是没理解这个地方的设计和并发情况.  简单解释下, 这里实现了经典read-write
dual-buffer机制, 以希望达到类似"Group Commit"效果, 调用函数不用等后端IO操作完成.
putRequest并没有在putMessage的同步块里面, 因此本身是存在并发调用的; 事实上, 即便放到put message lock里面,
考虑到OS进程调度, 也会存在和flush线程并发访问write ArrayList的情况.  另外,
简单把synchronize替换成一把ReentrantLock在这里没有任何收益.

On Wed, Dec 19, 2018 at 10:10 PM 叶茂伟 <ym...@126.com> wrote:

> 同意金融通的看法,另外补充几点个人看法:
> 1.
> 在RocketMQ中,类似问题中提到的sleep时间、尝试次数等设置,许多地方都写死了一个值。如果细挖的话,很多值都需要仔细斟酌或根据实际场景设置。但是否有必要把所有值都提供给用户配置,这又是一个值得思考的问题。
> 2. 最后一个commitlog写满时,消息处理问题。我认为确实存在,在刷盘之后,要下一次循环才能更新flushOK的值,这种做法本身就存在问题。
>
> > 在 2018年12月19日,下午9:39,金融通 <ji...@mails.ucas.ac.cn> 写道:
> >
> >
> > hello,我也来说一下针对其中两个问题我的想法。
> >
> > 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
> >
> >
> 第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。
> >
> >> -----原始邮件-----
> >> 发件人: "Gosling Von" <fe...@gmail.com>
> >> 发送时间: 2018-12-19 18:00:07 (星期三)
> >> 收件人: users@rocketmq.apache.org
> >> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
> >> 主题: Re: 关于RocketMQ消息存储的几点问题
> >>
> >> Hi,
> >>
> >> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache
> Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
> >>
> >>
> >> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
> >>
> >> 1.
> 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
> >>
> >> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU
> slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
> >>
> >> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
> >>
> >> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
> >>
> >>
> >>  if (millis == 0) {
> >>    // When ConvertSleepToYield is on, this matches the classic VM
> implementation of
> >>    // JVM_Sleep. Critical for similar threading behaviour (Win32)
> >>    // It appears that in certain GUI contexts, it may be beneficial to
> do a short sleep
> >>    // for SOLARIS
> >>    if (ConvertSleepToYield) {
> >>      os::yield();
> >>    } else {
> >>      ThreadState old_state = thread->osthread()->get_state();
> >>      thread->osthread()->set_state(SLEEPING);
> >>      os::sleep(thread, MinSleepInterval, false);
> >>      thread->osthread()->set_state(old_state);
> >>    }
> >>  } else {
> >>    ThreadState old_state = thread->osthread()->get_state();
> >>    thread->osthread()->set_state(SLEEPING);
> >>    if (os::sleep(thread, millis, true) == OS_INTRPT) {
> >>      // An asynchronous exception (e.g., ThreadDeathException) could
> have been thrown on
> >>      // us while we were sleeping. We do not overwrite those.
> >>      if (!HAS_PENDING_EXCEPTION) {
> >>        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
> >>        // TODO-FIXME: THROW_MSG returns which means we will not call
> set_state()
> >>        // to properly restore the thread state.  That's likely wrong.
> >>        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep
> interrupted");
> >>      }
> >>    }
> >>    thread->osthread()->set_state(old_state);
> >>  }
> >>  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
> >> JVM_END
> >>
> >>
> >>
> >> [1]
> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
> <
> http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp
> >
> >>
> >> Best Regards,
> >> Von Gosling
> >>
> >>
> >>> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
> >>>
> >>> 各位RocketMQ社区朋友好,
> >>>
> >>>
> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
> >>>
> >>> github的issue地址:https://github.com/apache/rocketmq/issues/574 <
> https://github.com/apache/rocketmq/issues/574>
> >>>
> >>> ————————————————————————————————————————————————————————
> >>>
> >>> 下面我把内容贴出来:
> >>>
> >>> 1.关于锁synchronized的使用问题;
> >>>
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069
> >
> >>> Line 1069 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> public synchronized void putRequest(final GroupCommitRequest request)
> {
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136
> >
> >>> Line 1136 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> synchronized (this) {
> >>>
> >>> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
> >>>
> >>>
> >>> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
> >>>
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131
> >
> >>> Line 1131 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> Thread.sleep(10);
> >>> GroupCommitService此处的睡眠时间是否合理?
> >>>
> >>>
> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
> >>>
> >>> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
> >>>
> >>>
> >>> 3. 可能会造成消息丢失;
> >>>
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947
> >
> >>> Lines 943 to 947 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> boolean result = false;
> >>> for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
> >>>     result = CommitLog.this.mappedFileQueue.commit(0);
> >>>     CommitLog.log.info(this.getServiceName() + " service shutdown,
> retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
> >>> }
> >>> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
> >>>
> >>> 举个简单的场景:
> >>>
> >>> A 在执行appendMessages;
> >>> B 在执行appendMessages;
> >>> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
> >>> 注意,A还未执行完appendMessages;
> >>> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
> >>> A 执行完appendMessages;
> >>> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
> >>>
> >>> 相同的,FlushRealTimeService异步刷盘也有类似问题。
> >>>
> >>>
> >>> 4. sleep(0)的问题
> >>>
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
> <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507
> >
> >>> Line 507 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> Thread.sleep(0);
> >>> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
> >>>
> >>> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说
> ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1
> 是一样的。这样的话,并不会让出线程的时间片。
> >>>
> >>> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
> >>>
> >>> 5.commitLog写满时,消息的处理问题
> >>>
> >>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <
> https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096
> >
> >>> Lines 1091 to 1096 in 1bedba8 <
> https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9
> >
> >>> for (int i = 0; i < 2 && !flushOK; i++) {
> >>>     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >=
> req.getNextOffset();
> >>>
> >>>     if (!flushOK) {
> >>>         CommitLog.this.mappedFileQueue.flush(0);
> >>>     }
> >>>
> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
> >>>
> >>> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK =
> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
> req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK =
> CommitLog.this.mappedFileQueue.getFlushedWhere() >=
> req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
> >>>
> >>> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
> >>
>
>

Re: 关于RocketMQ消息存储的几点问题

Posted by 叶茂伟 <ym...@126.com>.
同意金融通的看法,另外补充几点个人看法:
1. 在RocketMQ中,类似问题中提到的sleep时间、尝试次数等设置,许多地方都写死了一个值。如果细挖的话,很多值都需要仔细斟酌或根据实际场景设置。但是否有必要把所有值都提供给用户配置,这又是一个值得思考的问题。
2. 最后一个commitlog写满时,消息处理问题。我认为确实存在,在刷盘之后,要下一次循环才能更新flushOK的值,这种做法本身就存在问题。

> 在 2018年12月19日,下午9:39,金融通 <ji...@mails.ucas.ac.cn> 写道:
> 
> 
> hello,我也来说一下针对其中两个问题我的想法。
> 
> 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
> 
> 第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。
> 
>> -----原始邮件-----
>> 发件人: "Gosling Von" <fe...@gmail.com>
>> 发送时间: 2018-12-19 18:00:07 (星期三)
>> 收件人: users@rocketmq.apache.org
>> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
>> 主题: Re: 关于RocketMQ消息存储的几点问题
>> 
>> Hi,
>> 
>> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
>> 
>> 
>> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
>> 
>> 1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
>> 
>> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
>> 
>> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
>> 
>> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
>> 
>> 
>>  if (millis == 0) {
>>    // When ConvertSleepToYield is on, this matches the classic VM implementation of
>>    // JVM_Sleep. Critical for similar threading behaviour (Win32)
>>    // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
>>    // for SOLARIS
>>    if (ConvertSleepToYield) {
>>      os::yield();
>>    } else {
>>      ThreadState old_state = thread->osthread()->get_state();
>>      thread->osthread()->set_state(SLEEPING);
>>      os::sleep(thread, MinSleepInterval, false);
>>      thread->osthread()->set_state(old_state);
>>    }
>>  } else {
>>    ThreadState old_state = thread->osthread()->get_state();
>>    thread->osthread()->set_state(SLEEPING);
>>    if (os::sleep(thread, millis, true) == OS_INTRPT) {
>>      // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
>>      // us while we were sleeping. We do not overwrite those.
>>      if (!HAS_PENDING_EXCEPTION) {
>>        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>>        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
>>        // to properly restore the thread state.  That's likely wrong.
>>        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
>>      }
>>    }
>>    thread->osthread()->set_state(old_state);
>>  }
>>  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
>> JVM_END
>> 
>> 
>> 
>> [1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>
>> 
>> Best Regards,
>> Von Gosling
>> 
>> 
>>> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
>>> 
>>> 各位RocketMQ社区朋友好,
>>> 
>>> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
>>> 
>>> github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
>>> 
>>> ————————————————————————————————————————————————————————
>>> 
>>> 下面我把内容贴出来:
>>> 
>>> 1.关于锁synchronized的使用问题;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
>>> Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> public synchronized void putRequest(final GroupCommitRequest request) { 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
>>> Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> synchronized (this) { 
>>> 
>>> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
>>> 
>>> 
>>> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
>>> Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> Thread.sleep(10); 
>>> GroupCommitService此处的睡眠时间是否合理?
>>> 
>>> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
>>> 
>>> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
>>> 
>>> 
>>> 3. 可能会造成消息丢失;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
>>> Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> boolean result = false; 
>>> for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
>>>     result = CommitLog.this.mappedFileQueue.commit(0); 
>>>     CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
>>> } 
>>> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
>>> 
>>> 举个简单的场景:
>>> 
>>> A 在执行appendMessages;
>>> B 在执行appendMessages;
>>> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
>>> 注意,A还未执行完appendMessages;
>>> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
>>> A 执行完appendMessages;
>>> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
>>> 
>>> 相同的,FlushRealTimeService异步刷盘也有类似问题。
>>> 
>>> 
>>> 4. sleep(0)的问题
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
>>> Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> Thread.sleep(0); 
>>> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
>>> 
>>> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
>>> 
>>> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
>>> 
>>> 5.commitLog写满时,消息的处理问题
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
>>> Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> for (int i = 0; i < 2 && !flushOK; i++) { 
>>>     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
>>> 
>>>     if (!flushOK) { 
>>>         CommitLog.this.mappedFileQueue.flush(0); 
>>>     } 
>>> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
>>> 
>>> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
>>> 
>>> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
>> 


Re: 关于RocketMQ消息存储的几点问题

Posted by 叶茂伟 <ym...@126.com>.
同意金融通的看法,另外补充几点个人看法:
1. 在RocketMQ中,类似问题中提到的sleep时间、尝试次数等设置,许多地方都写死了一个值。如果细挖的话,很多值都需要仔细斟酌或根据实际场景设置。但是否有必要把所有值都提供给用户配置,这又是一个值得思考的问题。
2. 最后一个commitlog写满时,消息处理问题。我认为确实存在,在刷盘之后,要下一次循环才能更新flushOK的值,这种做法本身就存在问题。

> 在 2018年12月19日,下午9:39,金融通 <ji...@mails.ucas.ac.cn> 写道:
> 
> 
> hello,我也来说一下针对其中两个问题我的想法。
> 
> 春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。
> 
> 第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。
> 
>> -----原始邮件-----
>> 发件人: "Gosling Von" <fe...@gmail.com>
>> 发送时间: 2018-12-19 18:00:07 (星期三)
>> 收件人: users@rocketmq.apache.org
>> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
>> 主题: Re: 关于RocketMQ消息存储的几点问题
>> 
>> Hi,
>> 
>> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
>> 
>> 
>> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
>> 
>> 1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
>> 
>> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
>> 
>> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
>> 
>> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
>> 
>> 
>>  if (millis == 0) {
>>    // When ConvertSleepToYield is on, this matches the classic VM implementation of
>>    // JVM_Sleep. Critical for similar threading behaviour (Win32)
>>    // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
>>    // for SOLARIS
>>    if (ConvertSleepToYield) {
>>      os::yield();
>>    } else {
>>      ThreadState old_state = thread->osthread()->get_state();
>>      thread->osthread()->set_state(SLEEPING);
>>      os::sleep(thread, MinSleepInterval, false);
>>      thread->osthread()->set_state(old_state);
>>    }
>>  } else {
>>    ThreadState old_state = thread->osthread()->get_state();
>>    thread->osthread()->set_state(SLEEPING);
>>    if (os::sleep(thread, millis, true) == OS_INTRPT) {
>>      // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
>>      // us while we were sleeping. We do not overwrite those.
>>      if (!HAS_PENDING_EXCEPTION) {
>>        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>>        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
>>        // to properly restore the thread state.  That's likely wrong.
>>        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
>>      }
>>    }
>>    thread->osthread()->set_state(old_state);
>>  }
>>  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
>> JVM_END
>> 
>> 
>> 
>> [1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>
>> 
>> Best Regards,
>> Von Gosling
>> 
>> 
>>> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
>>> 
>>> 各位RocketMQ社区朋友好,
>>> 
>>> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
>>> 
>>> github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
>>> 
>>> ————————————————————————————————————————————————————————
>>> 
>>> 下面我把内容贴出来:
>>> 
>>> 1.关于锁synchronized的使用问题;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
>>> Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> public synchronized void putRequest(final GroupCommitRequest request) { 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
>>> Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> synchronized (this) { 
>>> 
>>> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
>>> 
>>> 
>>> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
>>> Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> Thread.sleep(10); 
>>> GroupCommitService此处的睡眠时间是否合理?
>>> 
>>> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
>>> 
>>> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
>>> 
>>> 
>>> 3. 可能会造成消息丢失;
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
>>> Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> boolean result = false; 
>>> for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
>>>     result = CommitLog.this.mappedFileQueue.commit(0); 
>>>     CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
>>> } 
>>> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
>>> 
>>> 举个简单的场景:
>>> 
>>> A 在执行appendMessages;
>>> B 在执行appendMessages;
>>> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
>>> 注意,A还未执行完appendMessages;
>>> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
>>> A 执行完appendMessages;
>>> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
>>> 
>>> 相同的,FlushRealTimeService异步刷盘也有类似问题。
>>> 
>>> 
>>> 4. sleep(0)的问题
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
>>> Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> Thread.sleep(0); 
>>> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
>>> 
>>> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
>>> 
>>> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
>>> 
>>> 5.commitLog写满时,消息的处理问题
>>> 
>>> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
>>> Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>>> for (int i = 0; i < 2 && !flushOK; i++) { 
>>>     flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
>>> 
>>>     if (!flushOK) { 
>>>         CommitLog.this.mappedFileQueue.flush(0); 
>>>     } 
>>> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
>>> 
>>> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
>>> 
>>> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
>> 


Re: Re: 关于RocketMQ消息存储的几点问题

Posted by 金融通 <ji...@mails.ucas.ac.cn>.
hello,我也来说一下针对其中两个问题我的想法。

春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。

第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。

> -----原始邮件-----
> 发件人: "Gosling Von" <fe...@gmail.com>
> 发送时间: 2018-12-19 18:00:07 (星期三)
> 收件人: users@rocketmq.apache.org
> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
> 主题: Re: 关于RocketMQ消息存储的几点问题
> 
> Hi,
> 
> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
> 
> 
> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
> 
> 1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
> 
> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
> 
> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
> 
> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
> 
> 
>   if (millis == 0) {
>     // When ConvertSleepToYield is on, this matches the classic VM implementation of
>     // JVM_Sleep. Critical for similar threading behaviour (Win32)
>     // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
>     // for SOLARIS
>     if (ConvertSleepToYield) {
>       os::yield();
>     } else {
>       ThreadState old_state = thread->osthread()->get_state();
>       thread->osthread()->set_state(SLEEPING);
>       os::sleep(thread, MinSleepInterval, false);
>       thread->osthread()->set_state(old_state);
>     }
>   } else {
>     ThreadState old_state = thread->osthread()->get_state();
>     thread->osthread()->set_state(SLEEPING);
>     if (os::sleep(thread, millis, true) == OS_INTRPT) {
>       // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
>       // us while we were sleeping. We do not overwrite those.
>       if (!HAS_PENDING_EXCEPTION) {
>         HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>         // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
>         // to properly restore the thread state.  That's likely wrong.
>         THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
>       }
>     }
>     thread->osthread()->set_state(old_state);
>   }
>   HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
> JVM_END
> 
> 
> 
> [1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>
> 
> Best Regards,
> Von Gosling
> 
> 
> > 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
> > 
> > 各位RocketMQ社区朋友好,
> > 
> > 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
> > 
> > github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
> > 
> > ————————————————————————————————————————————————————————
> > 
> > 下面我把内容贴出来:
> > 
> > 1.关于锁synchronized的使用问题;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
> > Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  public synchronized void putRequest(final GroupCommitRequest request) { 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
> > Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  synchronized (this) { 
> > 
> > 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
> > 
> > 
> > 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
> > Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  Thread.sleep(10); 
> > GroupCommitService此处的睡眠时间是否合理?
> > 
> > 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
> > 
> > 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
> > 
> > 
> > 3. 可能会造成消息丢失;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
> > Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  boolean result = false; 
> >  for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
> >      result = CommitLog.this.mappedFileQueue.commit(0); 
> >      CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
> >  } 
> > CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
> > 
> > 举个简单的场景:
> > 
> > A 在执行appendMessages;
> > B 在执行appendMessages;
> > 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
> > 注意,A还未执行完appendMessages;
> > 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
> > A 执行完appendMessages;
> > 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
> > 
> > 相同的,FlushRealTimeService异步刷盘也有类似问题。
> > 
> > 
> > 4. sleep(0)的问题
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
> > Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  Thread.sleep(0); 
> > -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
> > 
> > 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
> > 
> > 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
> > 
> > 5.commitLog写满时,消息的处理问题
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
> > Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  for (int i = 0; i < 2 && !flushOK; i++) { 
> >      flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
> >   
> >      if (!flushOK) { 
> >          CommitLog.this.mappedFileQueue.flush(0); 
> >      } 
> > 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
> > 
> > A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
> > 
> > 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
> 

Re: Re: 关于RocketMQ消息存储的几点问题

Posted by 金融通 <ji...@mails.ucas.ac.cn>.
hello,我也来说一下针对其中两个问题我的想法。

春龙的第一个加锁同意他的看法。他说的这两个synchronized之间并没有看出需要串行的理由。

第三个消息丢失的问题,举的场景可能不成立,因为putMessageLock存在,应该不存在两个线程同时appendMessage的情况。另外对于FlushRealTimeService因为消息是直接写入pagecache,所以只要操作系统不崩溃,内存不断电,即使不强制刷盘,还是能保证消息不丢失。但是CommitRealTimeService由于是先写到程序的内存,程序的内存再写入FileChannel,在shutdown后我觉得有极小的理论可能(当appendMessage的线程更新writePostion迟于CommitRealTimeService刷盘时的读writePostion,导致CommitRealTimeService认为已经将消息全部刷盘)导致消息没有写入文件。刚才不小心发送到开发者邮件列表了
> -----原始邮件-----
> 发件人: "Gosling Von" <fe...@gmail.com>
> 发送时间: 2018-12-19 18:00:07 (星期三)
> 收件人: users@rocketmq.apache.org
> 抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
> 主题: Re: 关于RocketMQ消息存储的几点问题
> 
> Hi,
> 
> 这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)
> 
> 
> 首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。
> 
> 1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。
> 
> 2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。
> 
> 3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。
> 
> 4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。
> 
> 
>   if (millis == 0) {
>     // When ConvertSleepToYield is on, this matches the classic VM implementation of
>     // JVM_Sleep. Critical for similar threading behaviour (Win32)
>     // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
>     // for SOLARIS
>     if (ConvertSleepToYield) {
>       os::yield();
>     } else {
>       ThreadState old_state = thread->osthread()->get_state();
>       thread->osthread()->set_state(SLEEPING);
>       os::sleep(thread, MinSleepInterval, false);
>       thread->osthread()->set_state(old_state);
>     }
>   } else {
>     ThreadState old_state = thread->osthread()->get_state();
>     thread->osthread()->set_state(SLEEPING);
>     if (os::sleep(thread, millis, true) == OS_INTRPT) {
>       // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
>       // us while we were sleeping. We do not overwrite those.
>       if (!HAS_PENDING_EXCEPTION) {
>         HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
>         // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
>         // to properly restore the thread state.  That's likely wrong.
>         THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
>       }
>     }
>     thread->osthread()->set_state(old_state);
>   }
>   HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
> JVM_END
> 
> 
> 
> [1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>
> 
> Best Regards,
> Von Gosling
> 
> 
> > 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
> > 
> > 各位RocketMQ社区朋友好,
> > 
> > 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
> > 
> > github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
> > 
> > ————————————————————————————————————————————————————————
> > 
> > 下面我把内容贴出来:
> > 
> > 1.关于锁synchronized的使用问题;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
> > Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  public synchronized void putRequest(final GroupCommitRequest request) { 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
> > Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  synchronized (this) { 
> > 
> > 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
> > 
> > 
> > 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
> > Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  Thread.sleep(10); 
> > GroupCommitService此处的睡眠时间是否合理?
> > 
> > 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
> > 
> > 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
> > 
> > 
> > 3. 可能会造成消息丢失;
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
> > Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  boolean result = false; 
> >  for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
> >      result = CommitLog.this.mappedFileQueue.commit(0); 
> >      CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
> >  } 
> > CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
> > 
> > 举个简单的场景:
> > 
> > A 在执行appendMessages;
> > B 在执行appendMessages;
> > 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
> > 注意,A还未执行完appendMessages;
> > 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
> > A 执行完appendMessages;
> > 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
> > 
> > 相同的,FlushRealTimeService异步刷盘也有类似问题。
> > 
> > 
> > 4. sleep(0)的问题
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
> > Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  Thread.sleep(0); 
> > -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
> > 
> > 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
> > 
> > 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
> > 
> > 5.commitLog写满时,消息的处理问题
> > 
> > rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
> > Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
> >  for (int i = 0; i < 2 && !flushOK; i++) { 
> >      flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
> >   
> >      if (!flushOK) { 
> >          CommitLog.this.mappedFileQueue.flush(0); 
> >      } 
> > 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
> > 
> > A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
> > 
> > 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。
> 


-----原始邮件-----
发件人:"Gosling Von" <fe...@gmail.com>
发送时间:2018-12-19 18:00:07 (星期三)
收件人: users@rocketmq.apache.org
抄送: "dev@rocketmq.apache.org" <de...@rocketmq.apache.org>
主题: Re: 关于RocketMQ消息存储的几点问题

Hi,


这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)




首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。


1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。


2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。


3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。


4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。




  if (millis == 0) {
    // When ConvertSleepToYield is on, this matches the classic VM implementation of
    // JVM_Sleep. Critical for similar threading behaviour (Win32)
    // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
    // for SOLARIS
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {
    ThreadState old_state = thread->osthread()->get_state();
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {
      // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
      // us while we were sleeping. We do not overwrite those.
      if (!HAS_PENDING_EXCEPTION) {
        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
        // to properly restore the thread state.  That's likely wrong.
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    thread->osthread()->set_state(old_state);
  }
  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
JVM_END






[1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp


Best Regards,
Von Gosling




在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:


各位RocketMQ社区朋友好,


最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。


github的issue地址:https://github.com/apache/rocketmq/issues/574


————————————————————————————————————————————————————————


下面我把内容贴出来:



1.关于锁synchronized的使用问题;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Line 1069 in 1bedba8

| | publicsynchronizedvoidputRequest(finalGroupCommitRequestrequest) { |

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Line 1136 in 1bedba8

| | synchronized (this) { |



我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。




2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Line 1131 in 1bedba8

| | Thread.sleep(10); |

GroupCommitService此处的睡眠时间是否合理?

因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。

是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。




3. 可能会造成消息丢失;

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines 943 to 947 in 1bedba8

| | boolean result =false; |
| | for (int i =0; i <RETRY_TIMES_OVER&&!result; i++) { |
| | result =CommitLog.this.mappedFileQueue.commit(0); |
| | CommitLog.log.info(this.getServiceName() +" service shutdown, retry "+ (i +1) +" times "+ (result ?"OK":"Not OK")); |
| | } |

CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。

举个简单的场景:

A 在执行appendMessages;
B 在执行appendMessages;
在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
注意,A还未执行完appendMessages;
按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
A 执行完appendMessages;

问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。

相同的,FlushRealTimeService异步刷盘也有类似问题。




4. sleep(0)的问题

rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java

Line 507 in 1bedba8

| | Thread.sleep(0); |

-XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield

但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。

我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?

5.commitLog写满时,消息的处理问题

rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines 1091 to 1096 in 1bedba8

| | for (int i =0; i <2&&!flushOK; i++) { |
| | flushOK =CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); |
| | |
| | if (!flushOK) { |
| | CommitLog.this.mappedFileQueue.flush(0); |
| | } |

同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。

A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。

所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。


Re: 关于RocketMQ消息存储的几点问题

Posted by Gosling Von <fe...@gmail.com>.
Hi,

这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)


首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。

1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。

2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。

3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。

4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。


  if (millis == 0) {
    // When ConvertSleepToYield is on, this matches the classic VM implementation of
    // JVM_Sleep. Critical for similar threading behaviour (Win32)
    // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
    // for SOLARIS
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {
    ThreadState old_state = thread->osthread()->get_state();
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {
      // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
      // us while we were sleeping. We do not overwrite those.
      if (!HAS_PENDING_EXCEPTION) {
        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
        // to properly restore the thread state.  That's likely wrong.
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    thread->osthread()->set_state(old_state);
  }
  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
JVM_END



[1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>

Best Regards,
Von Gosling


> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
> 
> 各位RocketMQ社区朋友好,
> 
> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
> 
> github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
> 
> ————————————————————————————————————————————————————————
> 
> 下面我把内容贴出来:
> 
> 1.关于锁synchronized的使用问题;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
> Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  public synchronized void putRequest(final GroupCommitRequest request) { 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
> Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  synchronized (this) { 
> 
> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
> 
> 
> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
> Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  Thread.sleep(10); 
> GroupCommitService此处的睡眠时间是否合理?
> 
> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
> 
> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
> 
> 
> 3. 可能会造成消息丢失;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
> Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  boolean result = false; 
>  for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
>      result = CommitLog.this.mappedFileQueue.commit(0); 
>      CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
>  } 
> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
> 
> 举个简单的场景:
> 
> A 在执行appendMessages;
> B 在执行appendMessages;
> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
> 注意,A还未执行完appendMessages;
> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
> A 执行完appendMessages;
> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
> 
> 相同的,FlushRealTimeService异步刷盘也有类似问题。
> 
> 
> 4. sleep(0)的问题
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
> Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  Thread.sleep(0); 
> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
> 
> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
> 
> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
> 
> 5.commitLog写满时,消息的处理问题
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
> Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  for (int i = 0; i < 2 && !flushOK; i++) { 
>      flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
>   
>      if (!flushOK) { 
>          CommitLog.this.mappedFileQueue.flush(0); 
>      } 
> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
> 
> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
> 
> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。


Re: 关于RocketMQ消息存储的几点问题

Posted by Gosling Von <fe...@gmail.com>.
Hi,

这几个问题挺有意思的,我不清楚为什么大家没有人做出回应。Apache Way鼓励大家多在邮件里列表里讨论与交流,我们RocketMQ社区的同学看来还是有些腼腆,好吧,我来抛砖引玉一下 :-)


首先明确一点,如果对代码有疑问,建议大家去实测下,然后给出自己的测试数据这样讨论会较好。

1. 关于锁的问题,可以看到volatile变量修饰的读和写请求,通过synchronized把它们组成原子操作,保证了同步操作原语。如果换成ReentrantLock会更好一些。

2. sleep的时间这块可以斟酌,但基本设计思路是希望IO一直忙,不要被CPU slot换出,所以你会看到它的时间被设置的很小。而有可能触发问题的shutdown此时就非常讲究了,这块的代码我们都需要再测试下。另外关于sleep(0)和yield的争议一致以来都有,推荐看下OpenJDK[1],在不同平台上会有不通的表现,这个问题值得大家再深入讨论下。

3. 消息丢失与CommitLog数据写满的问题,建议你再测试下,我们可以继续交流。

4. RocketMQ 5.0会重构、剥离数据存储这块,也欢迎大家实测,然后把自己的一些测试结论发出来,我们可以一起交流下。


  if (millis == 0) {
    // When ConvertSleepToYield is on, this matches the classic VM implementation of
    // JVM_Sleep. Critical for similar threading behaviour (Win32)
    // It appears that in certain GUI contexts, it may be beneficial to do a short sleep
    // for SOLARIS
    if (ConvertSleepToYield) {
      os::yield();
    } else {
      ThreadState old_state = thread->osthread()->get_state();
      thread->osthread()->set_state(SLEEPING);
      os::sleep(thread, MinSleepInterval, false);
      thread->osthread()->set_state(old_state);
    }
  } else {
    ThreadState old_state = thread->osthread()->get_state();
    thread->osthread()->set_state(SLEEPING);
    if (os::sleep(thread, millis, true) == OS_INTRPT) {
      // An asynchronous exception (e.g., ThreadDeathException) could have been thrown on
      // us while we were sleeping. We do not overwrite those.
      if (!HAS_PENDING_EXCEPTION) {
        HS_DTRACE_PROBE1(hotspot, thread__sleep__end,1);
        // TODO-FIXME: THROW_MSG returns which means we will not call set_state()
        // to properly restore the thread state.  That's likely wrong.
        THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
      }
    }
    thread->osthread()->set_state(old_state);
  }
  HS_DTRACE_PROBE1(hotspot, thread__sleep__end,0);
JVM_END



[1] http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp <http://hg.openjdk.java.net/jdk7/jdk7/hotspot/file/tip/src/share/vm/prims/jvm.cpp>

Best Regards,
Von Gosling


> 在 2018年12月4日,下午9:16,刘春龙 <63...@qq.com> 写道:
> 
> 各位RocketMQ社区朋友好,
> 
> 最近将RocketMQ的底层存储源码逐行看了一遍,发现了如下几个问题。希望各位能抽出时间逐个解答一下。如果是BUG的话,还请修复该问题。谢谢💕。
> 
> github的issue地址:https://github.com/apache/rocketmq/issues/574 <https://github.com/apache/rocketmq/issues/574>
> 
> ————————————————————————————————————————————————————————
> 
> 下面我把内容贴出来:
> 
> 1.关于锁synchronized的使用问题;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1069>
> Line 1069 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  public synchronized void putRequest(final GroupCommitRequest request) { 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1136>
> Line 1136 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  synchronized (this) { 
> 
> 我认为上述两处的synchronized是完全没有必要的。减少没必要的锁的使用会增加性能😄。
> 
> 
> 2. 睡眠时间不合理,可能会造成正在处理的消息响应失败;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1131>
> Line 1131 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  Thread.sleep(10); 
> GroupCommitService此处的睡眠时间是否合理?
> 
> 因为是同步刷盘策略,设置时间过短,在shutdown时,可能会造成正在处理的消息(eg,正在执行appendMessages的消息)来不及刷盘,进而响应失败(FLUSH_DISK_TIMEOUT)。
> 
> 是否可以采用其它方式?或者适当延长下时间?10毫秒是不是太短了。
> 
> 
> 3. 可能会造成消息丢失;
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L943-L947>
> Lines 943 to 947 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  boolean result = false; 
>  for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { 
>      result = CommitLog.this.mappedFileQueue.commit(0); 
>      CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); 
>  } 
> CommitRealTimeService此处没有第2点中提到的sleep机制。这里是采用尝试10次commit。
> 
> 举个简单的场景:
> 
> A 在执行appendMessages;
> B 在执行appendMessages;
> 在B执行完appendMessages,还未来得及通知CommitRealTimeService,与此同时shutdown;
> 注意,A还未执行完appendMessages;
> 按照上述代码逻辑,会尝试10次commit,在第1次时就能将B消息commit,并结束循环;
> A 执行完appendMessages;
> 问题就在于A消息不会被commit了,A消息丢失。但给客户端返回的响应就是OK💢。
> 
> 相同的,FlushRealTimeService异步刷盘也有类似问题。
> 
> 
> 4. sleep(0)的问题
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/MappedFile.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/MappedFile.java#L507>
> Line 507 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  Thread.sleep(0); 
> -XX:+ConvertSleepToYield 这个参数默认就是 true,如果 sleep 0 的话会转换成 yield
> 
> 但是 ConvertSleepToYield 设为 false 时,会进行 sleep,且 sleep 的最小时间是 1 ms,也就是说 ConvertSleepToYield 为 false 时,哪怕是 sleep 0 实际上跟 sleep 1 是一样的。这样的话,并不会让出线程的时间片。
> 
> 我个人认为 RocketMQ 应改为 Thread.yield 处理。或许sleep(0)有其它的意图?
> 
> 5.commitLog写满时,消息的处理问题
> 
> rocketmq/store/src/main/java/org/apache/rocketmq/store/CommitLog.java <https://github.com/apache/rocketmq/blob/1bedba8cbcef32554d2dc80f2a340ea686d68ba9/store/src/main/java/org/apache/rocketmq/store/CommitLog.java#L1091-L1096>
> Lines 1091 to 1096 in 1bedba8 <https://github.com/apache/rocketmq/commit/1bedba8cbcef32554d2dc80f2a340ea686d68ba9>
>  for (int i = 0; i < 2 && !flushOK; i++) { 
>      flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); 
>   
>      if (!flushOK) { 
>          CommitLog.this.mappedFileQueue.flush(0); 
>      } 
> 同步刷盘策略下,正常消息刷盘时,依赖第二次循环将flushOK置为true。但是如果当消息A写入时发现commitLog已经写满,则消息A会被写入下一个commitLog。当前待刷盘的commitLog会有两个。
> 
> A消息通知刷盘服务执行刷盘,第一次循环刷盘(刷入第一个commitLog的末尾数据)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();肯定为false,第二次刷盘(刷入消息A)时flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();也肯定是false,需要第三次循环来将flushOK置为true。
> 
> 所以上述场景下,flushOK为false。但实际消息A已经刷盘成功了。