You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/25 16:30:03 UTC
[GitHub] [pulsar] fretiq opened a new issue #8703: Delayed messages + Key_Shared causes blocked consumers
fretiq opened a new issue #8703:
URL: https://github.com/apache/pulsar/issues/8703
**Describe the bug**
When combining Key_Shared subscriptions and delayed messages, consumers fail to receive messages.
**To Reproduce**
Steps to reproduce the behavior:
1. Create a topic + Key_Shared subscription
1. Send 40 delayed messages
1. Send 40 messages
1. Create subscription with `receiverQueueSize=10`
1. Create another subscription with `receiverQueueSize=10`
1. Receive messages on each subscription until exhaustion
1. See that subscription 1 receives ~13 messages and subscription 2 receives none
Here's a test case:
```java
@Test
public void testContinueDispatchMessagesWhenMessageDelayed() throws Exception {
int totalMessages = 40;
int sum = 0;
final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.create();
for (int i = 0; i < totalMessages; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(100 + i)
.deliverAfter(10, TimeUnit.SECONDS)
.send();
}
for (int i = 0; i < totalMessages; i++) {
producer.newMessage()
.key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
.value(i)
.send();
}
@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
for (int i = 0; i < 100; i++) {
Message<Integer> msg = consumer1.receive(1, TimeUnit.SECONDS);
if (msg != null) {
log.info("c1 message: {}", msg.getValue());
consumer1.acknowledge(msg);
} else {
break;
}
sum++;
}
log.info("Got {} messages...", sum);
for (int i = 0; i < 100; i++) {
Message<Integer> msg = consumer2.receive(1, TimeUnit.SECONDS);
if (msg != null) {
log.info("c2 message: {}", msg.getValue());
consumer2.acknowledge(msg);
} else {
break;
}
sum++;
}
log.info("Got {} other messages...", sum);
Assert.assertEquals(sum, totalMessages);
}
```
<details>
<summary>Logs</summary>
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 0
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 1
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 2
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 3
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 4
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 5
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 6
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 7
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 8
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 9
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 11
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 12
org.apache.pulsar.client.api.KeySharedSubscriptionTest - c1 message: 16
org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 messages...
org.apache.pulsar.client.api.KeySharedSubscriptionTest - Got 13 other messages...
</details>
**Expected behavior**
All 40 messages should have been received.
**Desktop (please complete the following information):**
- OS: macOS
- Pulsar: v2.6.2
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] codelipenghui commented on issue #8703: Delayed messages + Key_Shared causes blocked consumers
Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #8703:
URL: https://github.com/apache/pulsar/issues/8703#issuecomment-846418849
@fretiq Key_Shared subscription can't work for delayed messages because the Key_Shared subscription need to guarantee the order of message distribution, but the delayed messages will break the Key_Shared semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org