You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/06/13 07:50:04 UTC

[GitHub] [pulsar] hellozepp commented on issue #4391: pulsar consumption ack strange behavior

hellozepp commented on issue #4391: pulsar consumption ack strange behavior
URL: https://github.com/apache/pulsar/issues/4391#issuecomment-501591371
 
 
   Hi~, @KannarFr and @sijie ,I tried write a unit test, but can not reproduce this issure. What have I lost?
   `
   @Test(timeOut = testTimeout)
       public void testReRunExclusiveAckedNormalTopic() throws Exception {
           String key = "testExclusiveAckedNormalTopic";
           final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
           final String subscriptionName = "my-ex-subscription-" + key;
           final String messagePredicate = "my-message-" + key + "-";
           final String rerunMessagePredicate = "my-rerun-message-" + key + "-";
           final int totalMessages = 15;
   
           // 1. producer connect
           Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
               .enableBatching(false)
               .messageRoutingMode(MessageRoutingMode.SinglePartition)
               .create();
   
           // 2. Create consumer
           Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                   .receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
                   .subscriptionType(SubscriptionType.Exclusive).subscribe();
   
           // 3. producer publish messages
           for (int i = 0; i < totalMessages / 3; i++) {
               String message = messagePredicate + i;
               log.info("Producer produced: " + message);
               producer.send(message.getBytes());
           }
   
           // 4. Receiver receives the message, do ack
           Message<byte[]> message = consumer.receive();
           while (message != null) {
               String data = new String(message.getData());
               log.info("Consumer received : " + data);
               consumer.acknowledge(message);
               message = consumer.receive(100, TimeUnit.MILLISECONDS);
           }
           long size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
           log.info(key + " Unacked Message Tracker size is " + size);
           assertEquals(size, 0);
           consumer.unsubscribe();
           consumer.close();
   
           // 5. Simulate ackTimeout
           Thread.sleep(ackTimeOutMillis);
   
           // 6. Create consumer again
           consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                   .receiverQueueSize(50).ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
                   .subscriptionType(SubscriptionType.Exclusive).subscribe();
   
           // 7. producer publish more messages
           for (int i = 0; i < totalMessages / 3; i++) {
   
               String m = rerunMessagePredicate + i;
               log.info("Producer produced: " + m);
               producer.send(m.getBytes());
           }
   
           // 8. Receiver receives the message, doesn't ack
           message = consumer.receive();
           int redelivered = 0;
           while (message != null) {
               redelivered++;
               String data = new String(message.getData());
               log.info("Consumer received : " + data);
               message = consumer.receive(100, TimeUnit.MILLISECONDS);
           }
           assertEquals(redelivered, 5);
           size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
           log.info(key + " Unacked Message Tracker size is " + size);
           assertEquals(size, 5);
   
           Thread.sleep(ackTimeOutMillis);
   
           // 9. Receiver receives redelivered messages
           message = consumer.receive();
           int redelivered1 = 0;
           while (message != null) {
               redelivered1++;
               String data = new String(message.getData());
               log.info("Consumer received : " + data);
               consumer.acknowledge(message);
               message = consumer.receive(100, TimeUnit.MILLISECONDS);
           }
           assertEquals(redelivered1, 5);
           size = ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().size();
           log.info(key + " Unacked Message Tracker size is " + size);
           assertEquals(size, 0);
       }
   `
   The unit test can be passed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services