You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/01 14:41:46 UTC

[GitHub] [pulsar] k-pisey opened a new issue #14086: Cannot receive all messages from a partition via internal consumer handling that partition

k-pisey opened a new issue #14086:
URL: https://github.com/apache/pulsar/issues/14086


   **Describe the bug**
   I use an internal consumer received at consumer event listener `becameActive(Consumer<?> consumer, int i) ` to consume message from the partition i.
   I got number of received messages that is very unidentical to number of produced messages.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   I leave my test program below.
   
   Test steps:
   1. Start pulsar container
   2. Create partitioned topic with number of partitions equal to 1
   3. Subscribe to partitioned topic with failover subscription type and consumer event listener
   4. Wait sufficient time until pulsar broker notified consumer event listener about an internal consumer became active on partition 0
   5. Start receiving message via the given consumer on another thread, it will be forever running
   6. Create a partitioned producer then produces messages to topic
   
   **Expected behavior**
   After received the last input message, number of received messages should identical to number of produced messages
   
   **Desktop (please complete the following information):**
    - OS: Mac OS
    - Java testcontainers
    - Intellij
   
   **Additional context**
   - pulsar image version: 2.9.1
   - pulsar-client version: 2.9.1
   - pulsar-client-admin version: 2.9.1
   - testcontainers version: 1.16.2
   
   ```java
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.admin.PulsarAdminException;
   import org.apache.pulsar.client.api.*;
   import org.junit.jupiter.api.Test;
   import org.testcontainers.containers.PulsarContainer;
   import org.testcontainers.utility.DockerImageName;
   
   import java.util.HashMap;
   import java.util.Map;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicBoolean;
   
   @Slf4j
   public class InternalConsumerTest {
       final static String TOPIC = "my-topic";
   
       static void setup(String restUrl) throws PulsarClientException, PulsarAdminException {
           log.info("Start setup");
           try (PulsarAdmin admin = PulsarAdmin.builder()
                   .serviceHttpUrl(restUrl)
                   .build();
           ) {
               admin.topics().createPartitionedTopic(TOPIC, 1);
           }
           log.info("Setup finished");
       }
   
       static void subscribeToTopic(PulsarClient client, EventListener eventListener, String topic, String subscription) {
           try {
               client.newConsumer(Schema.STRING)
                       .topic(topic)
                       .subscriptionName(subscription)
                       .consumerEventListener(eventListener)
                       .subscriptionType(SubscriptionType.Failover)
                       .subscribe();
           } catch (PulsarClientException e) {
               log.error("Failed to instantiate consumer on topic: {}", TOPIC, e);
           }
       }
   
       @Test
       void test() throws InterruptedException, PulsarAdminException, PulsarClientException {
           DockerImageName PULSAR_IMAGE = DockerImageName.parse("apachepulsar/pulsar:2.9.1");
           try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
           ) {
               pulsar.start();
               Thread.sleep(TimeUnit.SECONDS.toMillis(2));
               setup(pulsar.getHttpServiceUrl());
               try (PulsarClient client = PulsarClient.builder()
                       .serviceUrl(pulsar.getPulsarBrokerUrl())
                       .build()) {
                   Manager manager = new Manager();
                   EventListener eventListener = new EventListener(manager);
                   subscribeToTopic(client, eventListener, TOPIC, "sub-1");
                   while (!manager.consumingStarted.get()) {
                       Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                   }
                   // sleep another 1 sec after internal consumer started consuming
                   Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                   // create partitioned producer then produce messages to topic
                   Runnable producerTask = new Thread(producerTask(client));
                   producerTask.run();
                   // forever sleep
                   while (true) {
                       Thread.sleep(TimeUnit.MINUTES.toMillis(1));
                   }
               }
           }
       }
   
       static Runnable producerTask(PulsarClient client) {
           return () -> {
               try (Producer<String> producer = client.newProducer(Schema.STRING)
                       .topic(TOPIC)
                       .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                       .create()) {
                   log.info("Start producing");
                   String[] keys = {"1111", "2222", "3333"};
                   int k = 0;
                   for (int i = 0; i < 90; i++) {
                       if (k == 3) {
                           k = 0;
                       }
                       producer.newMessage()
                               .key(keys[k])
                               .value(String.valueOf(i))
                               .send();
                       k++;
                   }
                   
                 producer.newMessage()
                         .key(keys[0])
                         .value(String.valueOf(-1))
                         .send();
                   log.info("Producing finished");
               } catch (PulsarClientException e) {
                   log.info("Error", e);
               }
           };
       }
   
       static Runnable consumerTask(Consumer<?> consumer, AtomicBoolean consumingStarted) {
           return () -> {
               String s = "";
               int n = 0;
               log.info("Start receiving message from topic {}", consumer.getTopic());
               consumingStarted.set(true);
               while (!s.equals("-1")) { // if message not equal to last input message
                   try {
                       Message<?> msg = consumer.receive(1, TimeUnit.MINUTES);
                       if (msg != null) {
                           s = new String(msg.getData());
                           n++;
                           log.info("Received message: {}", s);
                           consumer.acknowledge(msg);
                       }
                   } catch (PulsarClientException e) {
                       log.error("Error", e);
                   }
               }
               log.info("Number of received messages: {}", n);
           };
       }
   
       static class EventListener implements ConsumerEventListener {
           private final Manager manager;
   
           EventListener(Manager manager) {
               this.manager = manager;
           }
   
           @Override
           public void becameActive(Consumer<?> consumer, int i) {
               log.info("Consumer {name: {}, hashcode: {}} became active on partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
               manager.run(consumer, i);
           }
   
           @Override
           public void becameInactive(Consumer<?> consumer, int i) {
               log.info("Consumer {name: {}, hashcode: {}} became inactive on partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
           }
       }
   
       static class Manager {
           Map<Integer, Thread> runningMap = new HashMap<>();
           AtomicBoolean consumingStarted = new AtomicBoolean(false);
   
           void run(Consumer<?> consumer, int i) {
               if (runningMap.containsKey(i)) {
                   return;
               }
               Thread thread = new Thread(consumerTask(consumer, consumingStarted));
               thread.start();
               runningMap.put(i, thread);
           }
       }
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on issue #14086: Cannot receive all messages from a partition via internal consumer handling that partition

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #14086:
URL: https://github.com/apache/pulsar/issues/14086#issuecomment-1058750204


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org