You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/23 09:10:44 UTC

[GitHub] [pulsar] massakam opened a new pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

massakam opened a new pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription
URL: https://github.com/apache/pulsar/pull/6592
 
 
   Fixes #6437
   
   ### Motivation
   
   In a replicated subscription with no consumers connected, the number of marker messages in the backlog will continue to increase. If at least one consumer is connected, the marker messages will be acknowledged and deleted by the dispatcher.
   https://github.com/apache/pulsar/blob/5fc4a90ba8a7d87c75d8425a0e67ac1d3e5c1651/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L80-L92
   However, if no consumers are connected, the dispatcher does not exist or has stopped reading entries. As a result, the marker messages accumulate in the backlog without being acknoledged by anyone.
   
   
   ### Modifications
   
   There are four types of marker messages:
   
   - ReplicatedSubscriptionsSnapshotRequest
   - ReplicatedSubscriptionsSnapshotResponse
   - ReplicatedSubscriptionsSnapshot
   - ReplicatedSubscriptionsUpdate
   
   Of these, three messages, except `ReplicatedSubscriptionsSnapshot`, are not used in the local cluster. They are published to local topics for sending to remote clusters. So, modified the `ReplicatedSubscriptionsController` class to acknowledge these marker messages on all subscriptions immediately after publishing the messages to a local topic.
   
   On the other hand, `ReplicatedSubscriptionsSnapshot` is used only in the local cluster and does not need to be sent to remote clusters. So stopped publishing `ReplicatedSubscriptionsSnapshot` to topics.
   
   In addition, marker messages sent from remote clusters are now acknowledged by the replicator on all subscriptions.
   
   With these changes, marker messages no longer continue to accumulate in the replicated subscription backlog.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription
URL: https://github.com/apache/pulsar/pull/6592#discussion_r399173100
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
 ##########
 @@ -122,9 +123,18 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip
         }
         // Snapshot is now complete, store it in the local topic
         PositionImpl p = (PositionImpl) position;
-        controller.writeMarker(
-                Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
-                        p.getLedgerId(), p.getEntryId(), responses));
+        try {
+            ReplicatedSubscriptionsSnapshot snapshot = Markers.instantiateReplicatedSubscriptionsSnapshot(snapshotId,
+                    controller.localCluster(), p.getLedgerId(), p.getEntryId(), responses);
+            controller.topic().getSubscriptions().forEach((subName, sub) -> {
+                if (sub != null) {
+                    sub.processReplicatedSubscriptionSnapshot(snapshot);
+                }
+            });
+        } catch (Throwable t) {
+            log.warn("[{}] Failed to process replicated subscription snapshot {} -- {}", controller.topic().getName(),
+                    snapshotId, t.getMessage());
+        }
 
 Review comment:
   An instance of `ReplicatedSubscriptionSnapshotCache` should hold up to 10 snapshots. 
   https://github.com/apache/pulsar/blob/7be1ee1fdb59421ac858b38840d3baf8c9073a5c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionSnapshotCache.java#L45-L60
   If we publish `ReplicatedSubscriptionsSnapshot` messages to a local topic and acknowledge only the messages that have been deleted from the cache, can we avoid the issue you are worried about?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription
URL: https://github.com/apache/pulsar/pull/6592#discussion_r399098767
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
 ##########
 @@ -122,9 +123,18 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip
         }
         // Snapshot is now complete, store it in the local topic
         PositionImpl p = (PositionImpl) position;
-        controller.writeMarker(
-                Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
-                        p.getLedgerId(), p.getEntryId(), responses));
+        try {
+            ReplicatedSubscriptionsSnapshot snapshot = Markers.instantiateReplicatedSubscriptionsSnapshot(snapshotId,
+                    controller.localCluster(), p.getLedgerId(), p.getEntryId(), responses);
+            controller.topic().getSubscriptions().forEach((subName, sub) -> {
+                if (sub != null) {
+                    sub.processReplicatedSubscriptionSnapshot(snapshot);
+                }
+            });
+        } catch (Throwable t) {
+            log.warn("[{}] Failed to process replicated subscription snapshot {} -- {}", controller.topic().getName(),
+                    snapshotId, t.getMessage());
+        }
 
 Review comment:
   I don’t understand a bit here. If we don't store snapshots into the topic, how can we recover snapshots after broker restarts?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] massakam commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription
URL: https://github.com/apache/pulsar/pull/6592#discussion_r399155081
 
 

 ##########
 File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java
 ##########
 @@ -122,9 +123,18 @@ synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscrip
         }
         // Snapshot is now complete, store it in the local topic
         PositionImpl p = (PositionImpl) position;
-        controller.writeMarker(
-                Markers.newReplicatedSubscriptionsSnapshot(snapshotId, controller.localCluster(),
-                        p.getLedgerId(), p.getEntryId(), responses));
+        try {
+            ReplicatedSubscriptionsSnapshot snapshot = Markers.instantiateReplicatedSubscriptionsSnapshot(snapshotId,
+                    controller.localCluster(), p.getLedgerId(), p.getEntryId(), responses);
+            controller.topic().getSubscriptions().forEach((subName, sub) -> {
+                if (sub != null) {
+                    sub.processReplicatedSubscriptionSnapshot(snapshot);
+                }
+            });
+        } catch (Throwable t) {
+            log.warn("[{}] Failed to process replicated subscription snapshot {} -- {}", controller.topic().getName(),
+                    snapshotId, t.getMessage());
+        }
 
 Review comment:
   I don't think that snapshots need to be persisted because they are constantly being updated with new ones, is that wrong? What issues do you think would arise if we don't persist `ReplicatedSubscriptionsSnapshot` messages?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] sijie commented on issue #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription

Posted by GitBox <gi...@apache.org>.
sijie commented on issue #6592: [Issue 6437][broker] Prevent marker messages from accumulating in backlog of replicated subscription
URL: https://github.com/apache/pulsar/pull/6592#issuecomment-604234812
 
 
   @jiazhai @codelipenghui @merlimat can you review this pull request?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services