You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/10/18 04:29:29 UTC

[GitHub] [rocketmq] WarriorFromLongAgo opened a new issue #2359: Filter Meassage 的使用困惑

WarriorFromLongAgo opened a new issue #2359:
URL: https://github.com/apache/rocketmq/issues/2359


   1. Please describe the issue you observed:
   - What did you do (The steps to reproduce)?
   生产者
   `public class FilterProducerTag {
       public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
           DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
           producer.setNamesrvAddr("192.168.42.131:9876");
           producer.start();
   
           String[] stringArr = {"TagA", "TagB", "TagC"};
           for (int i = 0; i < 6; i++) {
               Message message = new Message();
               message.setTopic("TopicTestFilter");
               message.setTags(stringArr[i % stringArr.length]);
               message.setKeys("KEY" + i);
               message.setBody( ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
   
               // Set some properties.
               // KEYS=KEY3 的消息丢失了,不知道是为什么,是队列不存在吗
               SendResult sendResult = producer.send(message);
               System.out.println("=========================================");
               System.out.println(sendResult);
               System.out.println(message);
               System.out.println("=========================================");
           }
           producer.shutdown();
       }
   }`
   consumer
   `public class FilterConsumerTag {
       public static void main(String[] args) throws MQClientException {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
           consumer.setNamesrvAddr("192.168.42.131:9876");
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           // only subsribe messages have property a, also a >=0 and a <= 3
   
           // consumer.subscribe("TopicTestFilter", MessageSelector.bySql("a between 0 and 2"));
   
           consumer.subscribe("TopicTestFilter", "TagA || TagB");
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                   for (MessageExt messageExt : messageExtList) {
                       System.out.println("========================================================");
                       System.out.println(messageExt);
                       System.out.println(new String(messageExt.getBody()));
                       System.out.println("========================================================");
                   }
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           consumer.start();
           System.out.println("consumer");
       }
   }`
   
   - What did you expect to see?
   我希望在consumer看到四条消息,分别来自TagA和TagB的两条
   - What did you see instead?
   我看到的是三条,来自TagB两条,来自TagA一条(日志如下所示)
   
   2. Please tell us about your environment:
   启动Linux rocketmq 4.7.1
   引入jar包 rocketmq-spring-boot-starter 2.1.1
   
   3. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):
   System.out.println的信息如下
   
   producer
   `RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap).
   RocketMQLog:WARN Please initialize the logger system properly.
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA7C0000, offsetMsgId=C0A82A8300002A9F00000000000CEC0C, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=27]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY0, UNIQ_KEY=64774C6B228018B4AAC25A21DA7C0000, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}
   =========================================
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA870001, offsetMsgId=C0A82A8300002A9F00000000000CECDB, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=26]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY1, UNIQ_KEY=64774C6B228018B4AAC25A21DA870001, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}
   =========================================
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA920002, offsetMsgId=C0A82A8300002A9F00000000000CEDAA, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=2], queueOffset=21]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY2, UNIQ_KEY=64774C6B228018B4AAC25A21DA920002, TAGS=TagC}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}
   =========================================
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA940003, offsetMsgId=C0A82A8300002A9F00000000000CEE79, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=3], queueOffset=20]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY3, UNIQ_KEY=64774C6B228018B4AAC25A21DA940003, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}
   =========================================
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA960004, offsetMsgId=C0A82A8300002A9F00000000000CEF48, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=0], queueOffset=28]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY4, UNIQ_KEY=64774C6B228018B4AAC25A21DA960004, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}
   =========================================
   =========================================
   SendResult [sendStatus=SEND_OK, msgId=64774C6B228018B4AAC25A21DA970005, offsetMsgId=C0A82A8300002A9F00000000000CF017, messageQueue=MessageQueue [topic=TopicTestFilter, brokerName=localhost.localdomain, queueId=1], queueOffset=27]
   Message{topic='TopicTestFilter', flag=0, properties={KEYS=KEY5, UNIQ_KEY=64774C6B228018B4AAC25A21DA970005, TAGS=TagC}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53], transactionId='null'}
   =========================================
   12:02:48.161 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:10911] result: true
   12:02:48.164 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.42.131:9876] result: true
   `
   
   consumer
   `========================================================
   MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=207, queueOffset=26, sysFlag=0, bornTimestamp=1602993768071, bornHost=/192.168.42.1:13693, storeTimestamp=1602993768103, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CECDB, commitLogOffset=847067, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=28, KEYS=KEY1, CONSUME_START_TIME=1602993783196, UNIQ_KEY=64774C6B228018B4AAC25A21DA870001, CLUSTER=DefaultCluster, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]
   Hello RocketMQ 1
   ========================================================
   ========================================================
   MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=27, sysFlag=0, bornTimestamp=1602993768060, bornHost=/192.168.42.1:13693, storeTimestamp=1602993768091, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CEC0C, commitLogOffset=846860, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=29, KEYS=KEY0, CONSUME_START_TIME=1602993783197, UNIQ_KEY=64774C6B228018B4AAC25A21DA7C0000, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]
   Hello RocketMQ 0
   ========================================================
   ========================================================
   MessageExt [brokerName=localhost.localdomain, queueId=0, storeSize=207, queueOffset=28, sysFlag=0, bornTimestamp=1602993768086, bornHost=/192.168.42.1:13693, storeTimestamp=1602993768113, storeHost=/192.168.42.131:10911, msgId=C0A82A8300002A9F00000000000CEF48, commitLogOffset=847688, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTestFilter', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=29, KEYS=KEY4, CONSUME_START_TIME=1602993783198, UNIQ_KEY=64774C6B228018B4AAC25A21DA960004, CLUSTER=DefaultCluster, TAGS=TagB}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]
   Hello RocketMQ 4
   ========================================================`
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on issue #2359: Filter Meassage 的使用困惑

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #2359:
URL: https://github.com/apache/rocketmq/issues/2359#issuecomment-711204087


   I tried it on my computer and received four messages, which were normal.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] vongosling closed issue #2359: Confusion of using filter meassage

Posted by GitBox <gi...@apache.org>.
vongosling closed issue #2359:
URL: https://github.com/apache/rocketmq/issues/2359


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org