You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/16 08:03:29 UTC

[GitHub] [pulsar] yakir-Yang opened a new issue #12059: [Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray

yakir-Yang opened a new issue #12059:
URL: https://github.com/apache/pulsar/issues/12059


   Case 1. After receiving the message from Pulsar consumer, just calling the consumer.acknowledge() immediately, this message can be acknowledged successfully. Here's the code
   ```
   consumer.receiveAsync().thenAccept(msg -> {
       consumer.acknowledge(msg.getMessageId());
       ....
   }
   ```
   
   Case 2. If I calling the toByteArray & fromByteArrayWithTopic functions, the message just can't be acknowledged successfully.
   ```
   consumer.receiveAsync().thenAccept(msg -> {
       MessageId msgId = MessageId.fromByteArrayWithTopic(msg.getMessageId().toByteArray(), topic.toString());
       consumer.acknowledgeAsync(msgId);
       ....
   }
   ```
   
   Case 3. If I calling the toByteArray & fromByteArray functions, the message just still can't be acknowledged successfully.
   ```
   consumer.receiveAsync().thenAccept(msg -> {
       MessageId msgId = MessageId.fromByteArray(msg.getMessageId().toByteArray());
      consumer.acknowledgeAsync(msgId);
       ....
   }
   ```
   
   Case 4. Still failed
   ```
   ```
   consumer.receiveAsync().thenAccept(msg -> {
       String messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
       MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic.toString());
       consumer.acknowledgeAsync(msgId);
       ....
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #12059: [Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #12059:
URL: https://github.com/apache/pulsar/issues/12059#issuecomment-1056055643


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] yakir-Yang commented on issue #12059: [Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray

Posted by GitBox <gi...@apache.org>.
yakir-Yang commented on issue #12059:
URL: https://github.com/apache/pulsar/issues/12059#issuecomment-920709586


   Here's my simple code that can reproduce this problem quickly.
   
   You can check the unacked messages through pulsar-admin tools
   
   ```
   root@pulsar-node-102135:/pulsar# ./bin/pulsar-admin topics stats persistent://edge-sit-message-center/default/subscriber_1415926654114594816
   {
     "count" : 0,
     ....
     "subscriptions" : {
      "kk-subscription" : {
         ......
         "unackedMessages" : 4842,
         "consumers" : [ {
           "unackedMessages" : 4842,
         } ],
         "isDurable" : false,
       }
     },
   ```
   
   
   
   ```
   package com.mycompany.app;
   
   import org.apache.commons.cli.*;
   import org.apache.pulsar.client.api.*;
   
   import java.io.IOException;
   import java.util.*;
   import java.util.concurrent.TimeUnit;
   import java.util.regex.Matcher;
   import java.util.regex.Pattern;
   //import org.json.JSONException;
   //import org.json.JSONObject;
   
   import com.alibaba.fastjson.JSON;
   import com.alibaba.fastjson.JSONObject;
   
   import java.io.UnsupportedEncodingException;
   import java.net.URLDecoder;
   import java.net.URLEncoder;
   import java.util.HashMap;
   import java.util.Map;
   
   /**
    * Hello world!
    *
    */
   public class App
   {
       static CommandLine commandLine = null;
       static long pkts = 0;
   
       public static void main( String[] args ) {
           CommandLineParser commandLineParser = new DefaultParser();
           Options options = new Options();
           options.addOption("addr", true, "Pulsar Server 地址");
           options.addOption("topic", true, "主题名称");
   
           try {
               commandLine = commandLineParser.parse(options, args);
           } catch (ParseException e) {
               System.out.println("---- exception");
               System.out.println(e);
               return;
           }
   
           new PulsarThread().start();
           new MetricThread().start();
   
           while (true) {
               try {
                   Thread.sleep(1000);
               } catch (InterruptedException e) {}
           }
       }
   
       static private class PulsarThread extends Thread {
           public PulsarThread() {
           }
   
           public void run() {
               try {
                   PulsarClient client = PulsarClient.builder().serviceUrl(commandLine.getOptionValue("addr")).build();
   
                   Consumer<byte[]> consumer = client.newConsumer()
                           .topic(commandLine.getOptionValue("topic"))
                           .consumerName("javatest")
                           .subscriptionName("kk-subscription")
                           .ackTimeout(10, TimeUnit.SECONDS)
                           .subscriptionType(SubscriptionType.Key_Shared)
                           .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                           .subscriptionMode(SubscriptionMode.NonDurable)
                           .subscribe();
   
                   while (true) {
                       Message<byte[]> message = consumer.receive();
                       if (message == null) {
                           continue;
                       }
   
                       // consumer.acknowledge(message);
   
                       MessageId msgId = MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), "persistent://edge-sit-message-center/default/subscriber_1415926654114594816");
                       consumer.acknowledgeAsync(msgId);
   
                       pkts += 1;
                   }
               } catch (Exception e) {
   
               }
           }
       }
   
       static private class MetricThread extends Thread {
           public void run() {
               long last = 0;
               while (true) {
                   System.out.printf("consumer speed: %d pkt/s\n", pkts - last);
                   last = pkts;
                   try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {}
               }
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] yakir-Yang commented on issue #12059: [Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray

Posted by GitBox <gi...@apache.org>.
yakir-Yang commented on issue #12059:
URL: https://github.com/apache/pulsar/issues/12059#issuecomment-920688330


   @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nodece commented on issue #12059: [Pulsar Websocket] Consumer message can't be acknowleaged successfully after toByteArray & fromByteArray

Posted by GitBox <gi...@apache.org>.
nodece commented on issue #12059:
URL: https://github.com/apache/pulsar/issues/12059#issuecomment-922236224


   @yakir-Yang I tried to run the code provided by you, it works fine.
   
   Set up the environment:
   
   - JAVA: 11
   - Pulsar: 2.8.0
   - Pulsar-client: 2.8.1
   
   ```
   docker run -it \
     -p 6650:6650 \
     -p 8080:8080 --name pulsar-issues-test \
     apachepulsar/pulsar:2.8.0 \
     bin/pulsar standalone
   ```
   
   The code so like:
   
   ```
   import java.nio.charset.StandardCharsets;
   import java.util.Base64;
   import java.util.concurrent.TimeUnit;
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.KeySharedPolicy;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.SubscriptionMode;
   import org.apache.pulsar.client.api.SubscriptionType;
   
   public class Main {
       public static void main(String[] args) throws Exception {
           String url = "http://localhost:8080";
           String topic = "persistent://public/default/issues-12059";
           PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(url).build();
           PulsarClient client = PulsarClient.builder().serviceUrl(url).build();
           Producer<byte[]> producer = client.newProducer().topic(topic).create();
           Thread producerThread = new Thread(() -> {
               while (true) {
                   try {
                       producer.send("hello-pulsar".getBytes(StandardCharsets.UTF_8));
                       Thread.sleep(5 * 1000);
                   } catch (PulsarClientException | InterruptedException e) {
                       e.printStackTrace();
                   }
               }
           });
   
           producerThread.start();
   
           Consumer<byte[]> consumer = client.newConsumer()
                   .topic(topic)
                   .consumerName("javatest")
                   .subscriptionName("kk-subscription")
                   .ackTimeout(10, TimeUnit.SECONDS)
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
                   .subscriptionMode(SubscriptionMode.NonDurable)
                   .subscribe();
   
           int n = 0;
           while (true) {
               Message<byte[]> message = consumer.receive();
               if (message == null) {
                   continue;
               }
   
               MessageId msgId;
   
               switch (n % 4) {
                   case 0:
                       msgId = message.getMessageId();
                       System.out.println("create MessageId from message.getMessageId()");
                       break;
                   case 1:
                       System.out.println(
                               "create MessageId from MessageId.fromByteArray(message.getMessageId().toByteArray())");
                       msgId = MessageId.fromByteArray(message.getMessageId().toByteArray());
                       break;
                   case 2:
                       System.out.println(
                               "create MessageId from MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), topic)");
                       msgId = MessageId.fromByteArrayWithTopic(message.getMessageId().toByteArray(), topic);
                       break;
                   case 3:
                       System.out.println(
                               "create MessageId from MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic)");
                       String messageId = Base64.getEncoder().encodeToString(message.getMessageId().toByteArray());
                       msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(messageId), topic);
                       break;
                   default:
                       throw new IllegalStateException("Unexpected value: " + n);
               }
   
               consumer.acknowledgeAsync(msgId);
               Thread.sleep(2 * 1000);
               admin.topics().getStats(topic).getSubscriptions().forEach((key, value) -> {
                   long unackedMessages = value.getUnackedMessages();
                   System.out.println("subscription: " + key + ", unackedMessages: " + unackedMessages);
                   if (unackedMessages != 0) {
                       System.out.println("expect value.getUnackedMessages() is 0, but got " + unackedMessages);
                       System.exit(1);
                   }
               });
               n++;
           }
       }
   }
   ```
   If you use other versions of pulsar, please tell me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org