You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/27 04:31:04 UTC

[GitHub] [pulsar] yebai1105 opened a new issue, #15341: negativeAcknowledge() parameter does not take effect

yebai1105 opened a new issue, #15341:
URL: https://github.com/apache/pulsar/issues/15341

   #### Expected behavior
   message redelivery
   
   #### Actual behavior
   Message cannot be redelivered
   
   #### Steps to reproduce
   producer code:
   `   public static void main(String[] args) throws PulsarClientException, InterruptedException {
   
           String token = "xxx";
           PulsarClient client = PulsarClient.builder()
                   //设置pulsar集群连接地址
                   .serviceUrl("xxx")
                   //设置认证token
                   .authentication(AuthenticationFactory.token(token))
                   .memoryLimit(64, SizeUnit.MEGA_BYTES)
                   .build();
           Producer<String> producer = client.newProducer(Schema.STRING)
                   .topic("xxx")
                   .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                   .enableBatching(true)
                   .batchingMaxBytes(1048576)
                   .batchingMaxMessages(10000)
                   .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
                   .blockIfQueueFull(true)
                   .create();
   
           long i = 0;
           while (true) {
               i++;
               producer.sendAsync("Message " + i).thenAccept(msgId -> {
                   System.out.println("Message " + msgId + " successfully sent");
               });
               if(i>10)break;
           }
       }`
   
   
   consumer code:
   `    public static void main(String[] args) throws PulsarClientException {
   
           String token = "xxx";
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl("xxx")
                   .authentication(AuthenticationFactory.token(token))
                   .memoryLimit(64, SizeUnit.MEGA_BYTES)
                   .build();
   
   
           Consumer<String> consumer = client.newConsumer(Schema.STRING)
                   .topic("xxx")
                   .subscriptionName("xxx")
                   .subscriptionType(SubscriptionType.Exclusive)
                   .receiverQueueSize(1000)
                   .batchReceivePolicy(BatchReceivePolicy.builder()
                           .maxNumMessages(1000)
                           .maxNumBytes(1024 * 1024 * 10)
                           .timeout(10, TimeUnit.MILLISECONDS)
                           .build())
                   .subscribe();
   
           while (true) {
               Messages<String> msgs = consumer.batchReceive();
               msgs.forEach(obj -> {
                   Message msg = null;
                   try {
                       msg = (Message) obj;
                       System.out.println("Message received: " + msg.getValue());
                       consumer.acknowledge(msg);
                   } catch (Exception e) {
                       // Message processing failed, resend later
   
                   }finally {
                       //Test: After each confirmation, cancel the confirmation again to make the message redelivery
                       consumer.negativeAcknowledge(msg);
                   }
               });
           }
       }`
   
   use`consumer.negativeAcknowledge(msg)`or`consumer.negativeAcknowledge(new MessageIdImpl(x,x,x))`After executing the consumer program, no message redelivery was found
   #### System configuration
   **Pulsar version**: 2.9.2/2.8.2
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] yebai1105 commented on issue #15341: negativeAcknowledge() parameter does not take effect

Posted by GitBox <gi...@apache.org>.
yebai1105 commented on issue #15341:
URL: https://github.com/apache/pulsar/issues/15341#issuecomment-1125792012

   Unacknowledged messages will be redelivered by default. If A acked message can not be redeliver again, what is the role of "negativeAcknowledge"?In what scenarios is the "negativeAcknowledge" parameter used?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun closed issue #15341: negativeAcknowledge() parameter does not take effect

Posted by GitBox <gi...@apache.org>.
tisonkun closed issue #15341: negativeAcknowledge() parameter does not take effect
URL: https://github.com/apache/pulsar/issues/15341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] drriguz commented on issue #15341: negativeAcknowledge() parameter does not take effect

Posted by GitBox <gi...@apache.org>.
drriguz commented on issue #15341:
URL: https://github.com/apache/pulsar/issues/15341#issuecomment-1139329495

   I'm also confused about this, I found that messages are not redelivered in my application by using the following code:
   ```java
   try {
               MetaDataChangedEvent<?> message = messageDecoder.decodeEvent(msg.getData());
               if (message instanceof AppDescriptionChanged) {
                   messageProcessService.processEvent((AppDescriptionChanged) message);
               } else {
                   messageProcessService.processEvent((SnapshotEvent<?>) message);
               }
               consumer.acknowledge(msg);
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               logger.error("Failed to process message {}: interrupted", msg.getMessageId(), e);
               consumer.negativeAcknowledge(msg);
           } catch (Exception e) {
               // todo: error handling & impl retry strategy
               logger.error("Failed to process message {}", msg.getMessageId(), e);
               consumer.negativeAcknowledge(msg);
           }
   ```
   
   The document does not say clearly about that, and there's also someone asking in Stackoverflow: https://stackoverflow.com/questions/69590720/pulsar-if-a-message-gets-nackd-negativeacknowledge-when-will-it-be-redeliv
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on issue #15341: negativeAcknowledge() parameter does not take effect

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #15341:
URL: https://github.com/apache/pulsar/issues/15341#issuecomment-1110540247

   @yebai1105 You can acked the message at 
   
   ```
   msg = (Message) obj;
   System.out.println("Message received: " + msg.getValue());
   consumer.acknowledge(msg);
   ```
   
   A acked message can not be redeliver again


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #15341: negativeAcknowledge() parameter does not take effect

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #15341:
URL: https://github.com/apache/pulsar/issues/15341#issuecomment-1166753840

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org