You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/28 03:09:30 UTC

[GitHub] [pulsar] leizhiyuan opened a new issue #14902: key_shared subscription will get suck sometimes when add consumers

leizhiyuan opened a new issue #14902:
URL: https://github.com/apache/pulsar/issues/14902


   **Describe the bug**
   
   key_shared subscription will get suck sometimes when add consumers
   
   when add consumers, sometimes, consumer will receive message very slow or get suck
   
   **To Reproduce**
   Steps to reproduce the behavior:
   
   ``` producer
   package org.example;
   
   import org.apache.pulsar.client.api.*;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.util.concurrent.TimeUnit;
   
   import static org.example.Helper.TP_TOPIC;
   
   public class SimpleProducer {
   
       private static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
   
       public static void main(String[] args) {
   
           logger.info("producer start");
           try {
               execute();
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   
       public static void execute() throws Exception {
           PulsarClient client = Helper.buildClient();
   
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
               try {
                   client.close();
               } catch (PulsarClientException e) {
                   e.printStackTrace();
               }
           }));
   
           Producer<String> producer = client.newProducer(Schema.STRING)
                   .topic(TP_TOPIC)
                   .create();
   
           for (int i = 0; i < 10000; i++) {
               try {
                   MessageId id = producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
                   TimeUnit.MILLISECONDS.sleep(5);
                   System.out.println(id);
               } catch (Throwable throwable) {
                   throwable.printStackTrace();
               }
           }
       }
   }
   
   ```
   
   ``` consumer
   package org.example;
   
   import static org.example.Helper.TP_TOPIC;
   
   import java.util.Map;
   import java.util.Random;
   import java.util.concurrent.ConcurrentHashMap;
   import java.util.concurrent.TimeUnit;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageListener;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.Schema;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   public class SimpleConsumer {
   
       private static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
   
       protected static Map<String, Consumer> consumers = new ConcurrentHashMap<>();
   
       static PulsarClient client = Helper.buildClient();
   
       public static void main(String[] args) {
   
           logger.info("consumer start");
           try {
               execute();
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   
       public static void execute() throws Exception {
           for (int i = 0; i < 50; i++) {
               createConsumer(i);
           }
   
   
           TimeUnit.SECONDS.sleep(5);
   
           System.out.println("update consumer num");
           for (int i = 50; i < 100; i++) {
               createConsumer(i);
           }
   
   
           TimeUnit.SECONDS.sleep(5);
           System.out.println("update consumer num");
           for (int i = 100; i < 150; i++) {
               createConsumer(i);
           }
   
   
           TimeUnit.SECONDS.sleep(1000000);
   
   
       }
   
   
       public static boolean nack() {
           if (new Random().nextInt(100) < 5) {
               return true;
           }
           return false;
       }
   
       private static void createConsumer(int i) throws PulsarClientException {
           Consumer consumer = client.newConsumer(Schema.BYTES)
                   .topic(TP_TOPIC)
                   .subscriptionName("eb-bystander")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .messageListener(new MessageListener<byte[]>() {
                       @Override
                       public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
                           System.out.println(message.getMessageId());
                           if (nack()) {
                               consumer.negativeAcknowledge(message);
                           } else {
                               try {
                                   consumer.acknowledge(message);
                               } catch (PulsarClientException e) {
                                   e.printStackTrace();
                               }
                           }
   
                       }
                   })
                   .subscribe();
   
           consumers.put(String.valueOf(i), consumer);
       }
   }
   
   ```
   
   
   ``` helper
   package org.example;
   
   import org.apache.pulsar.client.api.ClientBuilder;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   
   public class Helper {
   
       public static String TP_TOPIC="persistent://public/default/zhiyuanlei";
   
       public static PulsarClient buildClient(){
           ClientBuilder clientBuilder = PulsarClient.builder()
                   .serviceUrl("http://127.0.0.1:8080");
           PulsarClient client = null;
           try {
               client = clientBuilder.ioThreads(10).connectionsPerBroker(10).listenerThreads(10).build();
           } catch (PulsarClientException e) {
               e.printStackTrace();
           }
           return client;
       }
   }
   
   ```
   <img width="515" alt="image" src="https://user-images.githubusercontent.com/2684384/160319696-d6ef49c6-7aee-4c1f-a144-10bb645b9492.png">
   
   <img width="589" alt="image" src="https://user-images.githubusercontent.com/2684384/160319398-2cdcbd52-7520-43bd-abda-482f947013a0.png">
   
   we can ensure that producer produces  without stop, but consumer seems cannot get message, 
   
   maybe
   ```
    if (nack()) {
                               consumer.negativeAcknowledge(message);
                           }
   ``` 
   
   this is important.
   
   addition, in production env, we see some partition get suck
   
   **Expected behavior**
   A clear and concise description of what you expected to happen.
   
   consumer receives ok
   
   **Screenshots**
   If applicable, add screenshots to help explain your problem.
   
   **Desktop (please complete the following information):**
    - OS: [e.g. iOS]
   
   **Additional context**
   Add any other context about the problem here.
   run broker in local, with master branch can reproduce.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org