You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/05 09:36:14 UTC

[pulsar] branch master updated: Add logic in the dashboard to handle cases where Bundle, Topic, Subscription, Consumer already exists

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2103792  Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
2103792 is described below

commit 21037920be99efdfd24d55d09c38b2b3caf94e14
Author: tuteng <eg...@gmail.com>
AuthorDate: Sat Jan 5 17:36:10 2019 +0800

    Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
    
    ### Motivation
    
    Related Issue : #3226
    
    Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
    
    ### Modifications
    
    Update Consumer, Bundle,Subscription,Topic if exist
---
 dashboard/django/collector.py | 249 ++++++++++++++++++++++++++++--------------
 1 file changed, 167 insertions(+), 82 deletions(-)

diff --git a/dashboard/django/collector.py b/dashboard/django/collector.py
index c8c3b13..7e43376 100755
--- a/dashboard/django/collector.py
+++ b/dashboard/django/collector.py
@@ -77,10 +77,14 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
 
     clusters = dict( (cluster.name, cluster) for cluster in Cluster.objects.all() )
 
-    db_bundles = []
-    db_topics = []
-    db_subscriptions = []
-    db_consumers = []
+    db_create_bundles = []
+    db_update_bundles = []
+    db_create_topics = []
+    db_update_topics = []
+    db_create_subscriptions = []
+    db_update_subscriptions = []
+    db_create_consumers = []
+    db_update_consumers = []
     db_replication = []
 
     for namespace_name, bundles_stats in topics_stats.items():
@@ -94,73 +98,137 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
         namespace.save()
 
         for bundle_range, topics_stats in bundles_stats.items():
-            bundle = Bundle(
-                            broker    = broker,
-                            namespace = namespace,
-                            range     = bundle_range,
-                            cluster   = cluster,
-                            timestamp = timestamp)
-            db_bundles.append(bundle)
 
+            bundle = Bundle.objects.filter(
+                    cluster_id=cluster.id,
+                    namespace_id=namespace.id,
+                    range=bundle_range)
+            if bundle:
+                temp_bundle = bundle.first()
+                temp_bundle.timestame = timestamp
+                db_update_bundles.append(temp_bundle)
+                bundle = temp_bundle
+            else:
+                bundle = Bundle(
+                                broker    = broker,
+                                namespace = namespace,
+                                range     = bundle_range,
+                                cluster   = cluster,
+                                timestamp = timestamp)
+                db_create_bundles.append(bundle)
+                
             for topic_name, stats in topics_stats['persistent'].items():
-                topic = Topic(
-                    broker                 = broker,
-                    active_broker          = active_broker,
-                    name                   = topic_name,
-                    namespace              = namespace,
-                    bundle                 = bundle,
-                    cluster                = cluster,
-                    timestamp              = timestamp,
-                    averageMsgSize         = stats['averageMsgSize'],
-                    msgRateIn              = stats['msgRateIn'],
-                    msgRateOut             = stats['msgRateOut'],
-                    msgThroughputIn        = stats['msgThroughputIn'],
-                    msgThroughputOut       = stats['msgThroughputOut'],
-                    pendingAddEntriesCount = stats['pendingAddEntriesCount'],
-                    producerCount          = stats['producerCount'],
-                    storageSize            = stats['storageSize']
-                )
-
-                db_topics.append(topic)
+                topic = Topic.objects.filter(
+                    cluster_id=cluster.id,
+                    bundle_id=bundle.id,
+                    namespace_id=namespace.id,
+                    broker_id=broker.id,
+                    name=topic_name)
+                if topic:
+                    temp_topic = topic.first()
+                    temp_topic.timestamp              = timestamp
+                    temp_topic.averageMsgSize         = stats['averageMsgSize']
+                    temp_topic.msgRateIn              = stats['msgRateIn']
+                    temp_topic.msgRateOut             = stats['msgRateOut']
+                    temp_topic.msgThroughputIn        = stats['msgThroughputIn']
+                    temp_topic.msgThroughputOut       = stats['msgThroughputOut']
+                    temp_topic.pendingAddEntriesCount = stats['pendingAddEntriesCount']
+                    temp_topic.producerCount          = stats['producerCount']
+                    temp_topic.storageSize            = stats['storageSize']
+                    db_update_topics.append(temp_topic)
+                    topic = temp_topic
+                else:
+                    topic = Topic(
+                        broker                 = broker,
+                        active_broker          = active_broker,
+                        name                   = topic_name,
+                        namespace              = namespace,
+                        bundle                 = bundle,
+                        cluster                = cluster,
+                        timestamp              = timestamp,
+                        averageMsgSize         = stats['averageMsgSize'],
+                        msgRateIn              = stats['msgRateIn'],
+                        msgRateOut             = stats['msgRateOut'],
+                        msgThroughputIn        = stats['msgThroughputIn'],
+                        msgThroughputOut       = stats['msgThroughputOut'],
+                        pendingAddEntriesCount = stats['pendingAddEntriesCount'],
+                        producerCount          = stats['producerCount'],
+                        storageSize            = stats['storageSize']
+                    )
+                    db_create_topics.append(topic)
                 totalBacklog = 0
                 numSubscriptions = 0
                 numConsumers = 0
 
                 for subscription_name, subStats in stats['subscriptions'].items():
                     numSubscriptions += 1
-                    subscription = Subscription(
-                        topic            = topic,
-                        name             = subscription_name,
-                        namespace        = namespace,
-                        timestamp        = timestamp,
-                        msgBacklog       = subStats['msgBacklog'],
-                        msgRateExpired   = subStats['msgRateExpired'],
-                        msgRateOut       = subStats['msgRateOut'],
-                        msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
-                        msgThroughputOut = subStats['msgThroughputOut'],
-                        subscriptionType = subStats['type'][0],
-                        unackedMessages  = subStats.get('unackedMessages', 0),
-                    )
-                    db_subscriptions.append(subscription)
+                    subscription = Subscription.objects.filter(
+                            topic_id=topic.id,
+                            namespace_id=namespace.id,
+                            name=subscription_name)
+                    if subscription:
+                        temp_subscription = subscription.first()
+                        temp_subscription.timestamp        = timestamp
+                        temp_subscription.msgBacklog       = subStats['msgBacklog']
+                        temp_subscription.msgRateExpired   = subStats['msgRateExpired']
+                        temp_subscription.msgRateOut       = subStats['msgRateOut']
+                        temp_subscription.msgRateRedeliver = subStats.get('msgRateRedeliver', 0)
+                        temp_subscription.msgThroughputOut = subStats['msgThroughputOut']
+                        temp_subscription.subscriptionType = subStats['type'][0]
+                        temp_subscription.unackedMessages  = subStats.get('unackedMessages', 0)
+                        db_update_subscriptions.append(temp_subscription)
+                        subscription = temp_subscription
+                    else:
+                        subscription = Subscription(
+                            topic            = topic,
+                            name             = subscription_name,
+                            namespace        = namespace,
+                            timestamp        = timestamp,
+                            msgBacklog       = subStats['msgBacklog'],
+                            msgRateExpired   = subStats['msgRateExpired'],
+                            msgRateOut       = subStats['msgRateOut'],
+                            msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
+                            msgThroughputOut = subStats['msgThroughputOut'],
+                            subscriptionType = subStats['type'][0],
+                            unackedMessages  = subStats.get('unackedMessages', 0),
+                        )
+                        db_create_subscriptions.append(subscription)
 
                     totalBacklog += subStats['msgBacklog']
 
                     for consStats in subStats['consumers']:
                         numConsumers += 1
-                        consumer = Consumer(
-                            subscription     = subscription,
-                            timestamp        = timestamp,
-                            address          = consStats['address'],
-                            availablePermits = consStats.get('availablePermits', 0),
-                            connectedSince   = parse_date(consStats.get('connectedSince')),
-                            consumerName     = consStats.get('consumerName'),
-                            msgRateOut       = consStats.get('msgRateOut', 0),
-                            msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
-                            msgThroughputOut = consStats.get('msgThroughputOut', 0),
-                            unackedMessages  = consStats.get('unackedMessages', 0),
-                            blockedConsumerOnUnackedMsgs  = consStats.get('blockedConsumerOnUnackedMsgs', False)
-                        )
-                        db_consumers.append(consumer)
+                        consumer = Consumer.objects.filter(
+                            subscription_id=subscription.id,
+                            consumerName=consStats.get('consumerName'))
+                        if consumer:
+                            temp_consumer = consumer.first()
+                            temp_consumer.timestamp        = timestamp
+                            temp_consumer.address          = consStats['address']
+                            temp_consumer.availablePermits = consStats.get('availablePermits', 0)
+                            temp_consumer.connectedSince   = parse_date(consStats.get('connectedSince'))
+                            temp_consumer.msgRateOut       = consStats.get('msgRateOut', 0)
+                            temp_consumer.msgRateRedeliver = consStats.get('msgRateRedeliver', 0)
+                            temp_consumer.msgThroughputOut = consStats.get('msgThroughputOut', 0)
+                            temp_consumer.unackedMessages  = consStats.get('unackedMessages', 0)
+                            temp_consumer.blockedConsumerOnUnackedMsgs  = consStats.get('blockedConsumerOnUnackedMsgs', False)
+                            db_update_consumers.append(temp_consumer)
+                            consumer = temp_consumer
+                        else:
+                            consumer = Consumer(
+                                subscription     = subscription,
+                                timestamp        = timestamp,
+                                address          = consStats['address'],
+                                availablePermits = consStats.get('availablePermits', 0),
+                                connectedSince   = parse_date(consStats.get('connectedSince')),
+                                consumerName     = consStats.get('consumerName'),
+                                msgRateOut       = consStats.get('msgRateOut', 0),
+                                msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
+                                msgThroughputOut = consStats.get('msgThroughputOut', 0),
+                                unackedMessages  = consStats.get('unackedMessages', 0),
+                                blockedConsumerOnUnackedMsgs  = consStats.get('blockedConsumerOnUnackedMsgs', False)
+                            )
+                            db_create_consumers.append(consumer)
 
                 topic.backlog = totalBacklog
                 topic.subscriptionCount = numSubscriptions
@@ -210,46 +278,63 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
                 topic.localThroughputIn = topic.msgThroughputIn - replicationThroughputIn
                 topic.localThroughputOut = topic.msgThroughputIn - replicationThroughputOut
 
-
     if connection.vendor == 'postgresql':
         # Bulk insert into db
-        Bundle.objects.bulk_create(db_bundles, batch_size=10000)
+        Bundle.objects.bulk_create(db_create_bundles, batch_size=10000)
 
         # Trick to refresh primary keys after previous bulk import
-        for topic in db_topics: topic.bundle = topic.bundle
-        Topic.objects.bulk_create(db_topics, batch_size=10000)
+        for topic in db_create_topics: topic.bundle = topic.bundle
+        Topic.objects.bulk_create(db_create_topics, batch_size=10000)
 
-        for subscription in db_subscriptions: subscription.topic = subscription.topic
-        Subscription.objects.bulk_create(db_subscriptions, batch_size=10000)
+        for subscription in db_create_subscriptions: subscription.topic = subscription.topic
+        Subscription.objects.bulk_create(db_create_subscriptions, batch_size=10000)
 
-        for consumer in db_consumers: consumer.subscription = consumer.subscription
-        Consumer.objects.bulk_create(db_consumers, batch_size=10000)
+        for consumer in db_create_consumers: consumer.subscription = consumer.subscription
+        Consumer.objects.bulk_create(db_create_consumers, batch_size=10000)
 
         for replication in db_replication: replication.topic = replication.topic
         Replication.objects.bulk_create(db_replication, batch_size=10000)
 
-    else:
-        # For other DB providers we have to insert one by one
-        # to be able to retrieve the PK of the newly inserted records
-        for bundle in db_bundles:
-            bundle.save()
-
-        for topic in db_topics:
-            topic.bundle = topic.bundle
-            topic.save()
+        update_or_create_object(
+            db_update_bundles,
+            db_update_topics,
+            db_update_consumers,
+            db_update_subscriptions)
 
-        for subscription in db_subscriptions:
-            subscription.topic = subscription.topic
-            subscription.save()
-
-        for consumer in db_consumers:
-            consumer.subscription = consumer.subscription
-            consumer.save()
+    else:
+        update_or_create_object(
+            db_create_bundles,
+            db_create_topics,
+            db_create_consumers,
+            db_create_subscriptions)
+        update_or_create_object(
+            db_update_bundles,
+            db_update_topics,
+            db_update_consumers,
+            db_update_subscriptions)
 
         for replication in db_replication:
             replication.topic = replication.topic
             replication.save()
 
+def update_or_create_object(db_bundles, db_topics, db_consumers, db_subscriptions):
+    # For DB providers we have to insert or update one by one
+    # to be able to retrieve the PK of the newly inserted records
+    for bundle in db_bundles:
+        bundle.save()
+
+    for topic in db_topics:
+        topic.bundle = topic.bundle
+        topic.save()
+
+    for subscription in db_subscriptions:
+        subscription.topic = subscription.topic
+        subscription.save()
+
+    for consumer in db_consumers:
+        consumer.subscription = consumer.subscription
+        consumer.save()
+
 
 def fetch_stats():
     timestamp = current_milli_time()