You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/04 14:32:01 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #11216: [Websocket] Support cumulative acknowledge for Pulsar Websocket consumer.

codelipenghui commented on a change in pull request #11216:
URL: https://github.com/apache/pulsar/pull/11216#discussion_r742887330



##########
File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
##########
@@ -276,8 +284,62 @@ private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
         MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                 topic.toString());
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-        checkResumeReceive();
+        if ("cumulative".equals(command.ackType)) {
+            if (allowCumulativeAck) {
+                // If not pull mode, then we need to calculate how many messages have been acked in a cumulative ack request
+                // to properly calculate how many more messages we should dispatch to consumer. Use precise backlog size before
+                // and after the cumulative ack for the calculation.
+                // Use a writelock to prevent individual ack during the process which might cause incorrect result.
+                if (!this.pullMode) {
+                    try {
+                        pendingMessagesLock.writeLock().lock();
+                        MessageIdImpl messageId = (MessageIdImpl) msgId;
+                        long messagesToBeAcked = service.getAdminClient().topics().getNumberOfUnackedMessages(topic.toString(), subscription, messageId.getLedgerId(), messageId.getEntryId());
+                        consumer.acknowledgeCumulative(msgId);
+                        int ack = (int) messagesToBeAcked > pendingMessages.get() ? pendingMessages.get() : (int) messagesToBeAcked;
+                        int pending = pendingMessages.getAndAdd(-ack);
+                        if (pending >= maxPendingMessages) {
+                            // Resume delivery
+                            receiveMessage();
+                        }
+                    } catch (PulsarAdminException e) {
+                        log.warn("[{}] Fail to handle websocket consumer cumulative ack request: {}", consumer.getTopic(), e.getMessage());
+                    } finally {
+                        pendingMessagesLock.writeLock().unlock();
+                    }
+                } else {
+                    // for pull mode no need to keep track of how many messages to dispatch, so simply do cumulative ack.
+                    consumer.acknowledgeCumulativeAsync(msgId).exceptionally(exception -> {
+                        log.warn("[{}] Fail to handle websocket consumer cumulative ack request: {}", consumer.getTopic(), exception.getMessage());

Review comment:
       The cumulative ack only available for exclusive or failover subscription, if users call cumulative ack on a Shared subscription, we should return an error.

##########
File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
##########
@@ -276,8 +284,62 @@ private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
         MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                 topic.toString());
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-        checkResumeReceive();
+        if ("cumulative".equals(command.ackType)) {
+            if (allowCumulativeAck) {
+                // If not pull mode, then we need to calculate how many messages have been acked in a cumulative ack request
+                // to properly calculate how many more messages we should dispatch to consumer. Use precise backlog size before
+                // and after the cumulative ack for the calculation.
+                // Use a writelock to prevent individual ack during the process which might cause incorrect result.
+                if (!this.pullMode) {
+                    try {
+                        pendingMessagesLock.writeLock().lock();
+                        MessageIdImpl messageId = (MessageIdImpl) msgId;
+                        long messagesToBeAcked = service.getAdminClient().topics().getNumberOfUnackedMessages(topic.toString(), subscription, messageId.getLedgerId(), messageId.getEntryId());
+                        consumer.acknowledgeCumulative(msgId);
+                        int ack = (int) messagesToBeAcked > pendingMessages.get() ? pendingMessages.get() : (int) messagesToBeAcked;
+                        int pending = pendingMessages.getAndAdd(-ack);
+                        if (pending >= maxPendingMessages) {
+                            // Resume delivery
+                            receiveMessage();
+                        }
+                    } catch (PulsarAdminException e) {
+                        log.warn("[{}] Fail to handle websocket consumer cumulative ack request: {}", consumer.getTopic(), e.getMessage());
+                    } finally {
+                        pendingMessagesLock.writeLock().unlock();
+                    }
+                } else {
+                    // for pull mode no need to keep track of how many messages to dispatch, so simply do cumulative ack.
+                    consumer.acknowledgeCumulativeAsync(msgId).exceptionally(exception -> {
+                        log.warn("[{}] Fail to handle websocket consumer cumulative ack request: {}", consumer.getTopic(), exception.getMessage());
+                        return null;
+                    });
+                }
+            } else {
+                log.warn("[{}] Websocket consumer cumulative ack request not enabled", consumer.getTopic());
+            }
+        } else {
+            if (allowCumulativeAck) {
+                // multiple individual acks can happen at the same time, but individual ack and cumulative ack can't
+                // happen at the same time, as it'll interfere calculation of pending message, hence use a readlock.
+                // if not pull mode, produce request can also change backlog msg count so can't happen at the same
+                // time as cumulative ack.

Review comment:
       for the failover/exclusive we only support cumulative ack and for the shared/key-shared subscription we only support individual ack, so we can only check with the subscription type?

##########
File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
##########
@@ -276,8 +284,62 @@ private void handleAck(ConsumerCommand command) throws IOException {
         // We should have received an ack
         MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId),
                 topic.toString());
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
-        checkResumeReceive();
+        if ("cumulative".equals(command.ackType)) {
+            if (allowCumulativeAck) {
+                // If not pull mode, then we need to calculate how many messages have been acked in a cumulative ack request
+                // to properly calculate how many more messages we should dispatch to consumer. Use precise backlog size before
+                // and after the cumulative ack for the calculation.
+                // Use a writelock to prevent individual ack during the process which might cause incorrect result.
+                if (!this.pullMode) {
+                    try {
+                        pendingMessagesLock.writeLock().lock();
+                        MessageIdImpl messageId = (MessageIdImpl) msgId;
+                        long messagesToBeAcked = service.getAdminClient().topics().getNumberOfUnackedMessages(topic.toString(), subscription, messageId.getLedgerId(), messageId.getEntryId());

Review comment:
       Do we need to check if there are backlogs there? if no backlogs, we are safe to call receive messages, after the new messages are available, we can deliver to the websocket client directly.

##########
File path: pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
##########
@@ -96,6 +104,7 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
         this.pullMode = Boolean.valueOf(queryParams.get("pullMode"));
+        this.allowCumulativeAck = Boolean.valueOf(queryParams.get("allowCumulativeAck"));

Review comment:
       I think we don't need to introduce this config? it's depends on the subscription type, it's not reasonable for users to use Failover subscription but with `allowCumulativeAck=false`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org