You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/04/07 01:41:51 UTC

[GitHub] [pulsar] jeffgrunewald opened a new pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

jeffgrunewald opened a new pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680
 
 
   Fixes #5979
   
   ### Motivation
   
   To make consuming off a websocket more efficient, enabling cumulative acknowledgement will enable consumers to acknowledge chunks of messages and require fewer total responses than a per-message acknowledgement.
   
   Per my suggestion in issue #5979 I wanted to submit a proposed implementation for feedback and see if this would be an acceptable solution to the problem.
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage
   
   ### Does this pull request potentially affect one of the following parts:
   
     - The public API:
        With this change, websocket clients can now acknowledge by passing ack messages to the consumer like `{"messageId":"CAoQKA=="}` as before, or pass `{"messageId":"CAoQKA==","ackType":"cumulative"}` to acknowledge all unacked message up to and including the one identified by the message ID returned in the ack message.
   
     - The schema:
        The schema of the ConsumerCommand class now includes a new field.
   
   ### Documentation
   
   I'm unsure where to document the feature but would be happy to do so.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] equanz commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
equanz commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#issuecomment-611879812
 
 
   /pulsarbot run-failure-checks

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald closed pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald closed pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r406775485
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   Can you recommend a different interface for getting the current offset based on a message ID? If I have that, then I can still calculate the difference for the number of messages acked by the consumer and update the value.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r407318527
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   Umm... I don't think there is such an interface.
   We can find the number of messages between the two message IDs on the broker side, but it is difficult on the websocket proxy side.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r406702707
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
+                    consumer.acknowledgeCumulativeAsync(msgId).thenAccept(consumer -> numMsgsAcked.add(ackDiff));
+                    if (!this.pullMode) {
+                        int acked = (int) ackDiff;
+                        IntBinaryOperator decrementFunction = (x, y) -> x - y;
+                        int pending = pendingMessages.accumulateAndGet(acked, decrementFunction);
 
 Review comment:
   It is suitable to use `getAndAccumulate()` instead of ` accumulateAndGet()` here.
   
   We must execute `receiveMessage()` in order to resume delivery of messages to the client only if `pendingMessages` *before* the update is greater than or equal to `maxPendingMessages`.
   
   If `pendingMessages` is less than `maxPendingMessages`, then `receiveMessage()` has already been called just after receiving a message from the broker, and we don't need to execute it again here.
   https://github.com/apache/pulsar/blob/918dc6842d09bc07478adb5fb099aa43f1d40f1b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L196-L200

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r407841661
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   The REST API provided by the broker contains the number of messages that have not yet been acknowledged. However, I don't think it's realistic in terms of performance to access the broker's REST API every time the WebSocket proxy receives a cumulative ack.
   ```sh
   $ curl -s http://dev-broker01.pulsar.xxx.yahoo.co.jp:8080/admin/v2/persistent/massakam/test/t1/internalStats | jq .
   
   {
     "entriesAddedCounter": 10,
     "numberOfEntries": 30,
     "totalSize": 1760,
     "currentLedgerEntries": 10,
     "currentLedgerSize": 520,
     "lastLedgerCreatedTimestamp": "2020-04-14T10:47:40.829+09:00",
     "waitingCursorsCount": 2,
     "pendingAddEntriesCount": 0,
     "lastConfirmedEntry": "2823032:9",
     "state": "LedgerOpened",
     "ledgers": [
       {
         "ledgerId": 2820274,
         "entries": 20,
         "size": 1240,
         "offloaded": false
       },
       {
         "ledgerId": 2823032,
         "entries": 0,
         "size": 0,
         "offloaded": false
       }
     ],
     "cursors": {
       "pulsar.repl.dev2": {
         "markDeletePosition": "2823032:9",
         "readPosition": "2823032:10",
         "waitingReadOp": true,
         "pendingReadOps": 0,
         "messagesConsumedCounter": 10,
         "cursorLedger": 2823043,
         "cursorLedgerLastEntry": 1,
         "individuallyDeletedMessages": "[]",
         "lastLedgerSwitchTimestamp": "2020-04-14T10:47:40.831+09:00",
         "state": "Open",
         "numberOfEntriesSinceFirstNotAckedMessage": 1,
         "totalNonContiguousDeletedMessagesRange": 0,
         "properties": {}
       },
       "sub1": {
         "markDeletePosition": "2820274:19",
         "readPosition": "2823032:10",
         "waitingReadOp": true,
         "pendingReadOps": 0,
         "messagesConsumedCounter": 0,
         "cursorLedger": 2823042,
         "cursorLedgerLastEntry": 0,
         "individuallyDeletedMessages": "[]",
         "lastLedgerSwitchTimestamp": "2020-04-14T10:52:11.384+09:00",
         "state": "Open",
         "numberOfEntriesSinceFirstNotAckedMessage": 11,
         "totalNonContiguousDeletedMessagesRange": 0,
         "properties": {}
       }
     }
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r407862261
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   Understood. I’m not sure this PR makes much sense then, given that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r407480802
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   Is it infeasible to query the broker here to gather that information?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jiazhai commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#issuecomment-610461248
 
 
   /pulsarbot run-failure-checks
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r406774303
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
+                    consumer.acknowledgeCumulativeAsync(msgId).thenAccept(consumer -> numMsgsAcked.add(ackDiff));
+                    if (!this.pullMode) {
+                        int acked = (int) ackDiff;
+                        IntBinaryOperator decrementFunction = (x, y) -> x - y;
+                        int pending = pendingMessages.accumulateAndGet(acked, decrementFunction);
 
 Review comment:
   That's reasonable and easy to correct; thanks for the insight!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] jeffgrunewald commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
jeffgrunewald commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#issuecomment-611299281
 
 
   > I have a question about `pendingMessages`.
   > 
   > In pulsar-websocket, manage received but not-acked message by `pendingMessages`.
   > ex:
   > 
   > https://github.com/apache/pulsar/blob/a463d2a639cc8424970f3a663b70af45b84785f9/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L244-L250
   > 
   > It seems that changes not support `pendingMessages`(currently, simply decrement). Is my understanding correct?
   
   That seems correct and my apologies. I've pushed a new change that I hope should address this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] equanz commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
equanz commented on issue #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#issuecomment-611247063
 
 
   I have a question about `pendingMessages`.
   
   In pulsar-websocket, manage received but not-acked message by `pendingMessages`.
   ex: https://github.com/apache/pulsar/blob/a463d2a639cc8424970f3a663b70af45b84785f9/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L244-L250
   
   It seems that changes not support `pendingMessages`(currently, simply decrement). Is my understanding correct?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r406702707
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
+                    consumer.acknowledgeCumulativeAsync(msgId).thenAccept(consumer -> numMsgsAcked.add(ackDiff));
+                    if (!this.pullMode) {
+                        int acked = (int) ackDiff;
+                        IntBinaryOperator decrementFunction = (x, y) -> x - y;
+                        int pending = pendingMessages.accumulateAndGet(acked, decrementFunction);
 
 Review comment:
   It is suitable to use `getAndAccumulate()` instead of ` accumulateAndGet()` here.
   
   We must execute `receiveMessage()` in order to resume delivery of messages to the client only if `pendingMessages` **before** the update is greater than or equal to `maxPendingMessages`.
   
   If `pendingMessages` is less than `maxPendingMessages`, then `receiveMessage()` has already been called just after receiving a message from the broker, and we don't need to execute it again here.
   https://github.com/apache/pulsar/blob/918dc6842d09bc07478adb5fb099aa43f1d40f1b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java#L196-L200

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6680: [Issue 5979][pulsar-websocket] Add cumulative acknowledgement to websocket consumer
URL: https://github.com/apache/pulsar/pull/6680#discussion_r406695985
 
 

 ##########
 File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 ##########
 @@ -240,12 +242,27 @@ public void onWebSocketText(String message) {
                 // We should have received an ack
                 MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                         topic.toString());
-                consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-                if (!this.pullMode) {
-                    int pending = pendingMessages.getAndDecrement();
-                    if (pending >= maxPendingMessages) {
-                        // Resume delivery
-                        receiveMessage();
+                if ("cumulative".equals(command.ackType)) {
+                    ConsumerStats cStats = consumer.getStats();
+                    long ackDiff = cStats.getTotalMsgsReceived() - cStats.getTotalAcksSent();
 
 Review comment:
   These are "statistics" and are reset to 0 periodically (the default interval is 60 seconds). So I don't think these values should be used in this case.
   https://github.com/apache/pulsar/blob/988e03e07da4e31ff5b40ddc1647c7d64dec79f4/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java#L114-L159

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services