You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 16:30:03 UTC

[GitHub] [pulsar] fretiq opened a new issue #8703: Delayed messages + Key_Shared causes blocked consumers

fretiq opened a new issue #8703:
URL: https://github.com/apache/pulsar/issues/8703


   **Describe the bug**
   
   When combining Key_Shared subscriptions and delayed messages, consumers fail to receive messages.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   1. Create a topic + Key_Shared subscription
   1. Send 40 delayed messages
   1. Send 40 messages
   1. Create subscription with `receiverQueueSize=10`
   1. Create another subscription with `receiverQueueSize=10`
   1. Receive messages on each subscription until exhaustion
   1. See that subscription 1 receives ~13 messages and subscription 2 receives none
   
   Here's a test case:
   
   ```java
       @Test
       public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
           int totalMessages = 40;
           int sum = 0;
           final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
           final String subName = "my-sub";
   
           @Cleanup
           Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
                   .topic(topic)
                   .subscriptionName(subName)
                   .receiverQueueSize(10)
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .subscribe();
   
           @Cleanup
           Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
                   .topic(topic)
                   .create();
   
           for (int i = 0; i < totalMessages; i++) {
               producer.newMessage()
                       .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
                       .value(100 + i)
                       .deliverAfter(10, TimeUnit.SECONDS)
                       .send();
           }
   
           for (int i = 0; i < totalMessages; i++) {
               producer.newMessage()
                       .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
                       .value(i)
                       .send();
           }
   
           @Cleanup
           Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
                   .topic(topic)
                   .subscriptionName(subName)
                   .receiverQueueSize(10)
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .subscribe();
   
           for (int i = 0; i < 100; i++) {
               Message<Integer> msg = consumer1.receive(1, TimeUnit.SECONDS);
               if (msg != null) {
                   log.info("c1 message: {}", msg.getValue());
                   consumer1.acknowledge(msg);
               } else {
                   break;
               }
               sum++;
           }
   
           log.info("Got {} messages...", sum);
   
           for (int i = 0; i < 100; i++) {
               Message<Integer> msg = consumer2.receive(1, TimeUnit.SECONDS);
               if (msg != null) {
                   log.info("c2 message: {}", msg.getValue());
                   consumer2.acknowledge(msg);
               } else {
                   break;
               }
               sum++;
           }
   
           log.info("Got {} other messages...", sum);
   
           Assert.assertEquals(sum, totalMessages);
       }
   ```
   
   <details>
   <summary>Logs</summary>
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 0
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 1
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 2
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 3
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 4
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 5
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 6
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 7
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 8
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 9
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 11
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 12
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 16
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 messages...
   org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 other messages...
   </details>
   
   **Expected behavior**
   All 40 messages should have been received.
   
   **Desktop (please complete the following information):**
    - OS: macOS
   - Pulsar: v2.6.2
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #8703: Delayed messages + Key_Shared causes blocked consumers

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #8703:
URL: https://github.com/apache/pulsar/issues/8703#issuecomment-846418849


   @fretiq Key_Shared subscription can't work for delayed messages because the Key_Shared subscription need to guarantee the order of message distribution, but the delayed messages will break the Key_Shared semantics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org