You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/09/06 08:29:34 UTC

[GitHub] SpriderMan opened a new issue #455: why the broker does not support consumer to filter message by SQL92?

SpriderMan opened a new issue #455: why  the broker does not support consumer to filter message by SQL92? 
URL: https://github.com/apache/rocketmq/issues/455
 
 
   
   package org.apache.rocketmq.example.filter;
   
   import org.apache.rocketmq.client.exception.MQClientException;
   import org.apache.rocketmq.client.producer.DefaultMQProducer;
   import org.apache.rocketmq.client.producer.SendResult;
   import org.apache.rocketmq.common.message.Message;
   import org.apache.rocketmq.remoting.common.RemotingHelper;
   
   public class SqlProducer {
   
       public static void main(String[] args) {
           DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
           producer.setNamesrvAddr("127.0.0.1:9876");
   
           try {
               producer.start();
           } catch (MQClientException e) {
               e.printStackTrace();
               return;
           }
   
           for (int i = 0; i < 10; i++) {
               try {
                   String tag;
                   int div = i % 3;
                   if (div == 0) {
                       tag = "TagA";
                   } else if (div == 1) {
                       tag = "TagB";
                   } else {
                       tag = "TagC";
                   }
                   Message msg = new Message("TopicTest",
                       tag,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                   );
                   msg.putUserProperty("a", String.valueOf(i));
   
                   SendResult sendResult = producer.send(msg);
                   System.out.printf("%s%n", sendResult);
               } catch (Exception e) {
                   e.printStackTrace();
                   try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e1) {
                       e1.printStackTrace();
                   }
               }
           }
           producer.shutdown();
       }
   }
   
   
   
   package org.apache.rocketmq.example.filter;
   
   import java.util.List;
   
   import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
   import org.apache.rocketmq.client.consumer.MessageSelector;
   import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
   import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
   import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
   import org.apache.rocketmq.client.exception.MQClientException;
   import org.apache.rocketmq.common.message.MessageExt;
   
   public class SqlConsumer {
   
       public static void main(String[] args) {
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
           consumer.setNamesrvAddr("127.0.0.1:9876");
   
           try {
               consumer.subscribe("TopicTest",
                   MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
                       "and (a is not null and a between 0  3)"));
           } catch (MQClientException e) {
               e.printStackTrace();
               return;
           }
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
   
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                   ConsumeConcurrentlyContext context) {
                   System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
   
           try {
               consumer.start();
           } catch (MQClientException e) {
               e.printStackTrace();
               return;
           }
           System.out.printf("Consumer Started.%n");
       }
   }
   
   
    when I start the consumer to consume, the following exception will occur.
   
   15:48:17.493 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
   org.apache.rocketmq.client.exception.MQClientException: CODE: 1  DESC: The broker does not support consumer to filter message by SQL92
   For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
   	at org.apache.rocketmq.client.impl.MQClientAPIImpl.checkClientInBroker(MQClientAPIImpl.java:2089)
   	at org.apache.rocketmq.client.impl.factory.MQClientInstance.checkClientInBroker(MQClientInstance.java:432)
   	at org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.start(DefaultMQPushConsumerImpl.java:633)
   	at org.apache.rocketmq.client.consumer.DefaultMQPushConsumer.start(DefaultMQPushConsumer.java:520)
   	at org.apache.rocketmq.example.filter.SqlConsumer.main(SqlConsumer.java:56)
   a
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services