You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/13 09:46:34 UTC

[GitHub] [pulsar] zhucanyi opened a new issue #10571: When I want to get the parameter returned by the consumer after sending the message, there is no way to get it.

zhucanyi opened a new issue #10571:
URL: https://github.com/apache/pulsar/issues/10571


   When I want to get the parameter returned by the consumer after sending the message, there is no way to get it.
   
   **Describe the solution you'd like**
   I want to know whether the consumer can transmit such custom parameters in consumption, and query the corresponding parameters through messageId.
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features you've considered.
   
   I thought about alternatives like this:
   Send another message through the consumer end, send the logic to consume the message, I use the interceptor to intercept the key to consume the corresponding data, I do not know whether it is reasonable, just contact, there are many mistakes, please forgive
   
   `Producer producer = client.newProducer()
           .topic("cn.flyrise.workflow/todo/reply_message")
           .create();
   MessageId messageId = producer.newMessage()
           .key(id)
           .value("test reply")
           .send();
    //拦截器       
   ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
       @Override
       public void close() {
   
       }
   
       @Override
       public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
           if(!id.equals(message.getKey())){
               return null;
           }
           return message;
       }
   
       @Override
       public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
   
       }
   
       @Override
       public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
   
       }
   
       @Override
       public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) {
   
       }
   
       @Override
       public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) {
   
       }
   };
   Consumer<String> consumer = client.newConsumer()
           .topic("persistent://cn.flyrise.workflow/todo/reply_message")
           .intercept(new ConsumerInterceptor[]{interceptor})
           .subscriptionType(SubscriptionType.Shared)
           .subscriptionName("foo")
           .subscribe();
   
   while (true){
       Message<String> message  = consumer.receive();
       if(message!=null){
           System.out.println("Consumer message:"+new String(message.getData()));
           consumer.closeAsync();
           return Reply.success(new String(message.getData()));
       }
   }`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #10571: When I want to get the parameter returned by the consumer after sending the message, there is no way to get it.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #10571:
URL: https://github.com/apache/pulsar/issues/10571#issuecomment-1058890425


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org