You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/18 10:13:01 UTC

[GitHub] [pulsar] massakam opened a new pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

massakam opened a new pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299


   This is a further modification of the previously closed PR #6592.
   
   Fixes #6437
   
   ### Motivation
   
   In a replicated subscription with no consumers connected, the number of marker messages in the backlog will continue to increase. If at least one active consumer is connected, the marker messages will be acknowledged and deleted by the dispatcher.
   https://github.com/apache/pulsar/blob/2d2c63e4d7fc4ce90068edf15f39de82ef738e33/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L88-L100
   On the other hand, if no consumers are connected, or if they are connected but cannot receive any messages, the dispatcher does not exist or has stopped reading entries. As a result, the marker messages accumulate in the backlog without being acknoledged by anyone.
   
   
   ### Modifications
   
   There are four types of marker messages:
   
   - ReplicatedSubscriptionsSnapshotRequest
   - ReplicatedSubscriptionsSnapshotResponse
   - ReplicatedSubscriptionsSnapshot
   - ReplicatedSubscriptionsUpdate
   
   Of these, three messages, except `ReplicatedSubscriptionsSnapshot`, are not used in the local cluster. They are published to local topics for sending to remote clusters. So, modified the `ReplicatedSubscriptionsController` class to acknowledge these marker messages on all subscriptions immediately after publishing the messages to a local topic.
   
   On the other hand, `ReplicatedSubscriptionsSnapshot` is only used by dispatchers in the local cluster and does not need to be sent to remote clusters. So, `ReplicatedSubscriptionsController` only acknowledges those messages on behalf of dispatcher if dispatcher has not been initialized or if there are no available consumers.
   
   In addition, marker messages sent from remote clusters are now acknowledged by the replicator on all subscriptions.
   
   With these changes, marker messages no longer continue to accumulate in the replicated subscription backlog.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#issuecomment-728639310


   move to milestone 2.8.0 first


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#discussion_r451542303



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       > Is there any other solution to this issue?
   
   I also don’t have a very good way, it looks okay, just an extreme case
   
   > If there are no subscriptions on the topic, we don't need to ack since the messages will not accumulate in the backlog, do we?
   
   make sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#issuecomment-721597421


   @massakam Could you please rebase to the apache/master branch? Thanks.
   @merlimat @sijie @jiazhai Could you please also help review this PR? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#discussion_r447412479



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       @codelipenghui 
   > I'm a bit worried here. If the producer continues to produce messages but does not have consumers to consume messages, the individual ack the snapshot message will introduce more discontinuous acknowledge ranges. If that is true, the `individualAcks` in the cursor will get bigger and bigger.
   
   I see. Uhm...
   Is there any other solution to this issue?
   
   > And if all consumers of the subscription are disconnected. The subscription also removes from this topic. So maybe can't ack the snapshot message at this moment.
   
   If there are no subscriptions on the topic, we don't need to ack since the messages will not accumulate in the backlog, do we?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam closed pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
massakam closed pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7299: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7299:
URL: https://github.com/apache/pulsar/pull/7299#discussion_r442238997



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       I'm a bit worried here. If the producer continues to produce messages but does not have consumers to consume messages, the individual ack the snapshot message will introduce more discontinuous acknowledge ranges. If that is true, the `individualAcks` in the cursor will get bigger and bigger.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java
##########
@@ -70,6 +74,56 @@
     public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
         this.topic = topic;
         this.localCluster = localCluster;
+
+        this.publishContext = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published marker at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // Acknowledge the marker message to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                    Collections.emptyMap());
+                        }
+                    });
+                }
+            }
+        };
+
+        this.publishContextForSnapshot = new Topic.PublishContext() {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                // Nothing to do in case of publish errors since the retry logic is applied upstream
+                // after a snapshot is not closed
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Published snapshot at {}:{}. Exception: {}", topic.getName(), ledgerId, entryId, e);
+                }
+
+                if (e == null) {
+                    // If no consumers are connected, or if they are connected but cannot receive any messages
+                    // (i.e. the dispatcher is not reading new entries), acknowledge the snapshot message
+                    // to prevent it from accumulating in the backlog
+                    Position position = new PositionImpl(ledgerId, entryId);
+                    topic.getSubscriptions().forEach((subName, sub) -> {
+                        if (sub != null) {
+                            Dispatcher dispatcher = sub.getDispatcher();
+                            if (dispatcher == null || !dispatcher.isAtleastOneConsumerAvailable()) {
+                                sub.acknowledgeMessage(Collections.singletonList(position), AckType.Individual,
+                                        Collections.emptyMap());
+                            }

Review comment:
       And if all consumers of the subscription are disconnected. The subscription also removes from this topic. So maybe can't ack the snapshot message at this moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org