You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/20 10:18:07 UTC

[pulsar] branch master updated: Update message backlog figures in the database when clearing/deleting subscriptions. Keeping the UI up to date pending the next collector run. (#4559)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e643c5e  Update message backlog figures in the database when clearing/deleting subscriptions. Keeping the UI up to date pending the next collector run. (#4559)
e643c5e is described below

commit e643c5e4968484b06f64e216cb110a733b1aa5db
Author: rshermanTHG <48...@users.noreply.github.com>
AuthorDate: Thu Jun 20 11:18:02 2019 +0100

    Update message backlog figures in the database when clearing/deleting subscriptions. Keeping the UI up to date pending the next collector run. (#4559)
    
    Motivation
    When a subscription's backlog is cleared or the subscription is deleted the UI doesn't reflect the change in the backlog until the next run of the collector. This change updates the database with the new backlog count at the subscription and topic level.
    
    Modifications
    If the deletion of a subscription has a 204 response reduce the topic's message backlog count by the amount on the subscription.
    If the clearing of a subscription has a 204 response reduce the topic's message backlog count by the amount on the subscription and reduce the subscriptions backlog count to 0.
---
 dashboard/django/stats/models.py |  2 +-
 dashboard/django/stats/views.py  | 32 +++++++++++++++++++++++++-------
 2 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/dashboard/django/stats/models.py b/dashboard/django/stats/models.py
index 8daaa5b..86918f7 100644
--- a/dashboard/django/stats/models.py
+++ b/dashboard/django/stats/models.py
@@ -169,7 +169,7 @@ class Subscription(Model):
     unackedMessages  = BigIntegerField(default=0)
 
     class Meta:
-        index_together = ('name', 'topic', 'timestamp', 'deleted')
+        unique_together = ('name', 'topic', 'timestamp')
 
     def __str__(self):
         return self.name
diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py
index a371aa0a..338d08a 100644
--- a/dashboard/django/stats/views.py
+++ b/dashboard/django/stats/views.py
@@ -143,7 +143,7 @@ def deleteNamespace(request, namespace_name):
 
 def topic(request, topic_name):
     timestamp = get_timestamp()
-    topic_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
+    topic_name = extract_topic_db_name(topic_name)
     cluster_name = request.GET.get('cluster')
     clusters = []
 
@@ -309,7 +309,16 @@ def clusters(request):
 
 def clearSubscription(request, topic_name, subscription_name):
     url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' + subscription_name + '/skip_all'
-    requests.post(url)
+    response = requests.post(url)
+    if response.status_code == 204:
+        ts = get_timestamp()
+        topic_db_name = extract_topic_db_name(topic_name)
+        topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
+        subscription = Subscription.objects.get(name=subscription_name, topic=topic, timestamp=ts)
+        topic.backlog = topic.backlog - subscription.msgBacklog
+        topic.save(update_fields=['backlog'])
+        subscription.msgBacklog = 0
+        subscription.save(update_fields=['msgBacklog'])
     return redirect('topic', topic_name=topic_name)
 
 def deleteSubscription(request, topic_name, subscription_name):
@@ -318,9 +327,11 @@ def deleteSubscription(request, topic_name, subscription_name):
     status = response.status_code
     if status == 204:
         ts = get_timestamp()
-        topic_db_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
-        topic = Topic.objects.filter(name=topic_db_name, timestamp=ts)[0]
-        Subscription.objects.filter(name=subscription_name, topic=topic, timestamp=ts).update(deleted=True)
+        topic_db_name = extract_topic_db_name(topic_name)
+        topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
+        deleted_subscription = Subscription.objects.get(name=subscription_name, topic=topic, timestamp=ts)
+        deleted_subscription.deleted = True
+        deleted_subscription.save(update_fields=['deleted'])
         subscriptions = Subscription.objects.filter(topic=topic, deleted=False, timestamp=ts)
         if not subscriptions:
             topic.deleted=True
@@ -328,10 +339,13 @@ def deleteSubscription(request, topic_name, subscription_name):
             m = re.search(r"persistent/(?P<namespace>.*)/.*", topic_name)
             namespace_name = m.group("namespace")
             return redirect('namespace', namespace_name=namespace_name)
+        else:
+            topic.backlog = topic.backlog - deleted_subscription.msgBacklog
+            topic.save(update_fields=['backlog'])
     return redirect('topic', topic_name=topic_name)
 
 def messages(request, topic_name, subscription_name):
-    topic_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
+    topic_name = extract_topic_db_name(topic_name)
     timestamp = get_timestamp()
     cluster_name = request.GET.get('cluster')
 
@@ -356,4 +370,8 @@ def peek(request, topic_name, subscription_name, message_number):
     context = {
         'message_body' : json.dumps(json.loads(message), indent=4),
     }
-    return render(request, 'stats/peek.html', context)
\ No newline at end of file
+    return render(request, 'stats/peek.html', context)
+
+
+def extract_topic_db_name(topic_name):
+    return 'persistent://' + topic_name.split('persistent/', 1)[1]
\ No newline at end of file