You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/13 07:50:04 UTC
[GitHub] [pulsar] hellozepp commented on issue #4391: pulsar consumption ack
strange behavior
hellozepp commented on issue #4391: pulsar consumption ack strange behavior
URL: https://github.com/apache/pulsar/issues/4391#issuecomment-501591371
Hi~, @KannarFr and @sijie ,I tried write a unit test, but can not reproduce this issure. What have I lost?
`
@Test(timeOut = testTimeout)
public void testReRunExclusiveAckedNormalTopic() throws Exception {
String key = "testExclusiveAckedNormalTopic";
final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
final String subscriptionName = "my-ex-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final String rerunMessagePredicate = "my-rerun-message-" + key + "-";
final int totalMessages = 15;
// 1. producer connect
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
// 2. Create consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
// 3. producer publish messages
for (int i = 0; i < totalMessages / 3; i++) {
String message = messagePredicate + i;
log.info("Producer produced: " + message);
producer.send(message.getBytes());
}
// 4. Receiver receives the message, do ack
Message<byte[]> message = consumer.receive();
while (message != null) {
String data = new String(message.getData());
log.info("Consumer received : " + data);
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
long size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
consumer.unsubscribe();
consumer.close();
// 5. Simulate ackTimeout
Thread.sleep(ackTimeOutMillis);
// 6. Create consumer again
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
// 7. producer publish more messages
for (int i = 0; i < totalMessages / 3; i++) {
String m = rerunMessagePredicate + i;
log.info("Producer produced: " + m);
producer.send(m.getBytes());
}
// 8. Receiver receives the message, doesn't ack
message = consumer.receive();
int redelivered = 0;
while (message != null) {
redelivered++;
String data = new String(message.getData());
log.info("Consumer received : " + data);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
assertEquals(redelivered, 5);
size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 5);
Thread.sleep(ackTimeOutMillis);
// 9. Receiver receives redelivered messages
message = consumer.receive();
int redelivered1 = 0;
while (message != null) {
redelivered1++;
String data = new String(message.getData());
log.info("Consumer received : " + data);
consumer.acknowledge(message);
message = consumer.receive(100, TimeUnit.MILLISECONDS);
}
assertEquals(redelivered1, 5);
size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
log.info(key + " Unacked Message Tracker size is " + size);
assertEquals(size, 0);
}
`
The unit test can be passed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services