You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/10 01:52:50 UTC
[GitHub] [pulsar] aloyszhang opened a new issue #11612: reader can misuse an exist durable subscription
aloyszhang opened a new issue #11612:
URL: https://github.com/apache/pulsar/issues/11612
**Describe the bug**
In our product environment, sometimes set `startMessageId` for a reader(created by flink source) has no effect.
After troubleshoot, we found the root cause is the reader re-use the same subscriptionName exist before, and reader misuse the Durable Subscription object, so reader will read message from the Durable Subscription's `readPosition` instead of the `startMessageId`.
**To Reproduce**
Steps to reproduce the behavior:
1. build a topic and setup producer/consumer
```java
String topicName = "t/ns/topic";
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.readCompacted(true)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("mix-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
```
2. send message
```java
int messageNum = 10;
for (int i = 0; i < messageNum; i++) {
producer.send("message" + i);
```
3. receive the first 5 messages by this consumer and close
```java
for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
consumer.acknowledge(message);
}
consumer.close();
```
4. Setup a reader with the same subscription name and start from earliest
```java
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topicName)
.subscriptionName("mix-subscription")
.startMessageId(MessageId.earliest)
.create();
```
5. reader message by this reader and only five message can be read
```java
Message<String> message;
int readCount = 0;
while((message = reader.readNext(3, TimeUnit.SECONDS)) != null) {
log.info("###Message id : {}", message.getMessageId());
readCount ++;
}
Assert.assertEquals(readCount, 5);
```
**Expected behavior**
reader using the same subsciprioinName shoud be forbidden
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
**Additional context**
Add any other context about the problem here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli closed issue #11612: reader may misuse an exist durable subscription
Posted by GitBox <gi...@apache.org>.
eolivelli closed issue #11612:
URL: https://github.com/apache/pulsar/issues/11612
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org