You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/27 02:25:03 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

codelipenghui commented on a change in pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#discussion_r442238997



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       I'm a bit worried here. If the producer continues to produce messages but does not have consumers to consume messages, the individual ack the snapshot message will introduce more discontinuous acknowledge ranges. If that is true, the `individualAcks` in the cursor will get bigger and bigger.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       And if all consumers of the subscription are disconnected. The subscription also removes from this topic. So maybe can't ack the snapshot message at this moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org