You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "guyinyou (via GitHub)" <gi...@apache.org> on 2023/03/17 07:58:26 UTC

[GitHub] [rocketmq] guyinyou opened a new pull request, #6387: Some improvements for compactionTopic

guyinyou opened a new pull request, #6387:
URL: https://github.com/apache/rocketmq/pull/6387

   **Make sure set the target branch to `develop`**
   
   ## What is the purpose of the change
   
   <!--
   If this PR fixes a GitHub issue, please add `fixes #<XXX>` or `closes #<XXX>`. Please refer to the documentation for more information:
   https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue
   -->
   
   fix #6386 
   
   ## Brief changelog
   
   XX
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on a diff in pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on code in PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#discussion_r1142957439


##########
docs/cn/Example_Compaction_Topic_cn.md:
##########


Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#issuecomment-1473334816

   > Error: /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java:129:42: '{' is not preceded with whitespace. [WhitespaceAround] Error: /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java:149:13: 'if' is not followed by whitespace. [WhitespaceAround] Error: /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java:149:67: '{' is not preceded with whitespace. [WhitespaceAround] Audit done.
   
   fix done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ltamber commented on a diff in pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "ltamber (via GitHub)" <gi...@apache.org>.
ltamber commented on code in PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#discussion_r1142922116


##########
docs/cn/Example_Compaction_Topic_cn.md:
##########
@@ -1,59 +1,73 @@
 # Compaction Topic
 
 ## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
 ### 创建compaction topic
+
 ```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
 create topic to 127.0.0.1:10911 success.
 TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
 ```
+
 ### 生产数据
+
 与普通消息一样
+
 ```java
-DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
-producer.setNamesrvAddr("localhost:9876");
-producer.start();
-
-String topic = "ctopic";
-String tag = "tag1";
-String key = "key1";
-Message msg = new Message(topic, tag, key, "bodys"getBytes(StandardCharsets.UTF_8));
-SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
-    int select = Math.abs(shardingKey.hashCode());
-    if (select < 0) {
-        select = 0;
-    }
-    return mqs.get(select % mqs.size());
-}, key);
-
-System.out.printf("%s%n", sendResult);
+DefaultMQProducer producer=new DefaultMQProducer("CompactionTestGroup");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+
+        String topic="ctopic";
+        String tag="tag1";
+        String key="key1";
+        Message msg=new Message(topic,tag,key,"bodys"getBytes(StandardCharsets.UTF_8));
+        SendResult sendResult=producer.send(msg,(mqs,message,shardingKey)->{
+        int select=Math.abs(shardingKey.hashCode());
+        if(select< 0){
+        select=0;
+        }
+        return mqs.get(select%mqs.size());
+        },key);
+
+        System.out.printf("%s%n",sendResult);

Review Comment:
   the code format looks messed up



##########
docs/cn/Example_Compaction_Topic_cn.md:
##########
@@ -1,59 +1,73 @@
 # Compaction Topic
 
 ## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
 ### 创建compaction topic
+
 ```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
 create topic to 127.0.0.1:10911 success.
 TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
 ```
+
 ### 生产数据
+
 与普通消息一样
+
 ```java
-DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
-producer.setNamesrvAddr("localhost:9876");
-producer.start();
-
-String topic = "ctopic";
-String tag = "tag1";
-String key = "key1";
-Message msg = new Message(topic, tag, key, "bodys"getBytes(StandardCharsets.UTF_8));
-SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
-    int select = Math.abs(shardingKey.hashCode());
-    if (select < 0) {
-        select = 0;
-    }
-    return mqs.get(select % mqs.size());
-}, key);
-
-System.out.printf("%s%n", sendResult);
+DefaultMQProducer producer=new DefaultMQProducer("CompactionTestGroup");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+
+        String topic="ctopic";
+        String tag="tag1";
+        String key="key1";
+        Message msg=new Message(topic,tag,key,"bodys"getBytes(StandardCharsets.UTF_8));
+        SendResult sendResult=producer.send(msg,(mqs,message,shardingKey)->{
+        int select=Math.abs(shardingKey.hashCode());
+        if(select< 0){
+        select=0;
+        }
+        return mqs.get(select%mqs.size());
+        },key);
+
+        System.out.printf("%s%n",sendResult);
 ``` 
+
 ### 消费数据
+
 消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
 在compaction场景下,大部分消费都是从0开始消费完整的数据
+
 ```java
-DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
-consumer.setNamesrvAddr("localhost:9876");
-consumer.setPullThreadNums(4);
-consumer.start();
-
-Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
-consumer.assign(messageQueueList);
-messageQueueList.forEach(mq -> {
-    try {
+DefaultLitePullConsumer consumer=new DefaultLitePullConsumer("compactionTestGroup");
+        consumer.setNamesrvAddr("localhost:9876");
+        consumer.setPullThreadNums(4);
+        consumer.start();
+
+        Collection<MessageQueue> messageQueueList=consumer.fetchMessageQueues("ctopic");
+        consumer.assign(messageQueueList);
+        messageQueueList.forEach(mq->{
+        try{
         consumer.seekToBegin(mq);
-    } catch (MQClientException e) {
+        }catch(MQClientException e){
         e.printStackTrace();
-    }
-});
-
-Map<String, byte[]> kvStore = Maps.newHashMap();
-while (true) {
-    List<MessageExt> msgList = consumer.poll(1000);
-    if (msgList != null) {
-        msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
-    }
-}
+        }
+        });
+
+        Map<String, byte[]>kvStore=Maps.newHashMap();
+        while(true){
+        List<MessageExt> msgList=consumer.poll(1000);
+        if(msgList!=null){
+        msgList.forEach(msg->kvStore.put(msg.getKeys(),msg.getBody()));
+        }
+        }

Review Comment:
   the code format looks messed up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin merged pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin merged PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#issuecomment-1473329579

   Error:  /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/CompactionService.java:129:42: '{' is not preceded with whitespace. [WhitespaceAround]
   Error:  /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java:149:13: 'if' is not followed by whitespace. [WhitespaceAround]
   Error:  /home/runner/work/rocketmq/rocketmq/store/src/main/java/org/apache/rocketmq/store/kv/MessageFetcher.java:149:67: '{' is not preceded with whitespace. [WhitespaceAround]
   Audit done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ltamber commented on a diff in pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "ltamber (via GitHub)" <gi...@apache.org>.
ltamber commented on code in PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#discussion_r1142918943


##########
docs/cn/Example_Compaction_Topic_cn.md:
##########
@@ -1,59 +1,73 @@
 # Compaction Topic
 
 ## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+

Review Comment:
   maybe we need to explain why order message is necessary here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ltamber commented on a diff in pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "ltamber (via GitHub)" <gi...@apache.org>.
ltamber commented on code in PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#discussion_r1142920955


##########
docs/cn/Example_Compaction_Topic_cn.md:
##########


Review Comment:
   the [en edition](https://github.com/apache/rocketmq/blob/develop/docs/en/Example_Compaction_Topic.md) example file should update in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] guyinyou commented on a diff in pull request #6387: [ISSUE #6386] Some improvements for compactionTopic

Posted by "guyinyou (via GitHub)" <gi...@apache.org>.
guyinyou commented on code in PR #6387:
URL: https://github.com/apache/rocketmq/pull/6387#discussion_r1142957169


##########
docs/cn/Example_Compaction_Topic_cn.md:
##########
@@ -1,59 +1,73 @@
 # Compaction Topic
 
 ## 使用方式
+
+### 打开namesrv上支持顺序消息的开关
+
+```shell
+$ bin/mqadmin updateNamesrvConfig -k orderMessageEnable -v true
+```
+
 ### 创建compaction topic
+
 ```shell
-$ bin/mqadmin updateTopic -w 8 -r 8 -a +delete.policy=COMPACTION -n localhost:9876 -t ctopic -c DefaultCluster
+$ bin/mqadmin updateTopic -w 8 -r 8 -a +cleanup.policy=COMPACTION -n localhost:9876 -t ctopic -o true -c DefaultCluster
 create topic to 127.0.0.1:10911 success.
 TopicConfig [topicName=ctopic, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={+delete.policy=COMPACTION}]
 ```
+
 ### 生产数据
+
 与普通消息一样
+
 ```java
-DefaultMQProducer producer = new DefaultMQProducer("CompactionTestGroup");
-producer.setNamesrvAddr("localhost:9876");
-producer.start();
-
-String topic = "ctopic";
-String tag = "tag1";
-String key = "key1";
-Message msg = new Message(topic, tag, key, "bodys"getBytes(StandardCharsets.UTF_8));
-SendResult sendResult = producer.send(msg, (mqs, message, shardingKey) -> {
-    int select = Math.abs(shardingKey.hashCode());
-    if (select < 0) {
-        select = 0;
-    }
-    return mqs.get(select % mqs.size());
-}, key);
-
-System.out.printf("%s%n", sendResult);
+DefaultMQProducer producer=new DefaultMQProducer("CompactionTestGroup");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+
+        String topic="ctopic";
+        String tag="tag1";
+        String key="key1";
+        Message msg=new Message(topic,tag,key,"bodys"getBytes(StandardCharsets.UTF_8));
+        SendResult sendResult=producer.send(msg,(mqs,message,shardingKey)->{
+        int select=Math.abs(shardingKey.hashCode());
+        if(select< 0){
+        select=0;
+        }
+        return mqs.get(select%mqs.size());
+        },key);
+
+        System.out.printf("%s%n",sendResult);
 ``` 
+
 ### 消费数据
+
 消费offset与compaction之前保持不变,如果指定offset消费,当指定的offset不存在时,返回后面最近的一条数据
 在compaction场景下,大部分消费都是从0开始消费完整的数据
+
 ```java
-DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("compactionTestGroup");
-consumer.setNamesrvAddr("localhost:9876");
-consumer.setPullThreadNums(4);
-consumer.start();
-
-Collection<MessageQueue> messageQueueList = consumer.fetchMessageQueues("ctopic");
-consumer.assign(messageQueueList);
-messageQueueList.forEach(mq -> {
-    try {
+DefaultLitePullConsumer consumer=new DefaultLitePullConsumer("compactionTestGroup");
+        consumer.setNamesrvAddr("localhost:9876");
+        consumer.setPullThreadNums(4);
+        consumer.start();
+
+        Collection<MessageQueue> messageQueueList=consumer.fetchMessageQueues("ctopic");
+        consumer.assign(messageQueueList);
+        messageQueueList.forEach(mq->{
+        try{
         consumer.seekToBegin(mq);
-    } catch (MQClientException e) {
+        }catch(MQClientException e){
         e.printStackTrace();
-    }
-});
-
-Map<String, byte[]> kvStore = Maps.newHashMap();
-while (true) {
-    List<MessageExt> msgList = consumer.poll(1000);
-    if (msgList != null) {
-        msgList.forEach(msg -> kvStore.put(msg.getKeys(), msg.getBody()));
-    }
-}
+        }
+        });
+
+        Map<String, byte[]>kvStore=Maps.newHashMap();
+        while(true){
+        List<MessageExt> msgList=consumer.poll(1000);
+        if(msgList!=null){
+        msgList.forEach(msg->kvStore.put(msg.getKeys(),msg.getBody()));
+        }
+        }

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org