You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/09/25 07:07:14 UTC

[GitHub] wangweifan opened a new issue #475: Consumers can not receive transaction messages.

wangweifan opened a new issue #475: Consumers can not receive transaction messages.
URL: https://github.com/apache/rocketmq/issues/475
 
 
   ---------------------------------producer ---------------------------------------------
   @Test
       public void sendTransactionMQ() throws MQClientException, UnsupportedEncodingException, InterruptedException {
           TransactionListener transactionListener = new TransactionListenerImpl();
           TransactionMQProducer producer = new TransactionMQProducer(TRANSACTION_MQ_PRODUCER_GROUP_NAME);
           ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
               @Override
               public Thread newThread(Runnable r) {
                   Thread thread = new Thread(r);
                   thread.setName("client-transaction-msg-check-thread");
                   return thread;
               }
           });
           producer.setNamesrvAddr("127.0.0.1:9876");
           producer.setExecutorService(executorService);
           producer.setTransactionListener(transactionListener);
           producer.start();
           Message msg = new Message("testMq", "TagA", "KEYi", ("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET));
           SendResult sendResult = producer.sendMessageInTransaction(msg, null);
           System.out.printf("%s%n", sendResult);
           producer.shutdown();
       }
   ---------------------------------producer ---------------------------------------------
   
   ------------------------------------------consumer--------------------------------------------
   public static void main(String[] args) throws InterruptedException, MQClientException {
   
   
           //设置消费者组名
           DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_mq_consumer");
           //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
           consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
           //指定nameServer的地址
           consumer.setNamesrvAddr("127.0.0.1:9876");
           //指定订阅的topic及tag表达式
           consumer.subscribe("testMq", "*");
   
           consumer.registerMessageListener(new MessageListenerConcurrently() {
               @Override
               public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                               ConsumeConcurrentlyContext context) {
                   for (MessageExt msg : msgs) {
                       System.out.println(String.format("Custome2 message [%s],tagName[%s]",
                               new String(msg.getBody()),
                               msg.getTags()));
                   }
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                   //return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
           });
           //启动消费者实例
           consumer.start();
           System.out.println("Consumer Started.");
       }
   ------------------------------------------consumer--------------------------------------------
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services