You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/11/08 03:53:52 UTC

[GitHub] [rocketmq-spring] roneywei commented on issue #166: 延迟队列

roneywei commented on issue #166: 延迟队列
URL: https://github.com/apache/rocketmq-spring/issues/166#issuecomment-551373601
 
 
   我利用RocketMQTemplate.java发送延迟消息
   demo如下
   @Test
       public void syncDelayQueue() {
        
           String messageContent = "syncDelayQueue";
           Map h = new ConcurrentHashMap();
           h.put("message", messageContent);
           h.put("defaultQueue", syncDelayQueue);
           Message<?> message = MessageBuilder
                   .withPayload(JSON.toJSON(h))
                   .setHeader("contentType", "application/json")
                   .build();
   
           producer.syncDelayQueue(message, MessageDelayLevel.ONE_S);
   
        
   
       }
   
   public void syncDelayQueue(Message<?> message, MessageDelayLevel messageDelayLevel) {
           log.info("send syncDelayQueue message :{};rocketMqDelayLevel:{}", message, messageDelayLevel);
           this.rocketMqUtils.syncDelayQueue(syncDelayQueue, message, messageDelayLevel);
       }
   
    public SendResult syncDelayQueue(String queueName, Message<?> message, MessageDelayLevel messageDelayLevel)  {
           log.info("syncDelayQueue queueName:{},message:{},delayLevel:{}", queueName, message, messageDelayLevel);
           SendResult result = rocketMQTemplate.syncSend(queueName, message,  rocketMQTemplate.getProducer().getSendMsgTimeout(), messageDelayLevel.getValue());
           log.info("syncDelayQueue result:{}",result);
           return result;
       }
   
   我发送的是org.springframework.messaging.Message
   
   请问一下,我怎样直接接收org.springframework.messaging.Message发送的内容
   
    @Component
       @RocketMQMessageListener(topic =  "${mq.demo.sync-delay-queue:syncDelayQueueDemo}",consumerGroup = "demo")
       class SyncDelayQueueDemo  implements RocketMQListener<Message> {
           @Override
           public void onMessage( Message message) {
   
               log.info("RocketMqConsumer1 syncDelayQueueDemo message :{} " ,message);
           }
       }
   
   错误信息如下:
   
   java.lang.RuntimeException: cannot convert message to interface org.springframework.messaging.Message
   	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.doConvertMessage(DefaultRocketMQListenerContainer.java:382)
   	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.access$100(DefaultRocketMQListenerContainer.java:57)
   	at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:330)
   	at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:411)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services