You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/01/05 09:36:14 UTC
[pulsar] branch master updated: Add logic in the dashboard to
handle cases where Bundle, Topic, Subscription, Consumer already exists
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2103792 Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
2103792 is described below
commit 21037920be99efdfd24d55d09c38b2b3caf94e14
Author: tuteng <eg...@gmail.com>
AuthorDate: Sat Jan 5 17:36:10 2019 +0800
Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
### Motivation
Related Issue : #3226
Add logic in the dashboard to handle cases where Bundle,Topic,Subscription,Consumer already exists
### Modifications
Update Consumer, Bundle,Subscription,Topic if exist
---
dashboard/django/collector.py | 249 ++++++++++++++++++++++++++++--------------
1 file changed, 167 insertions(+), 82 deletions(-)
diff --git a/dashboard/django/collector.py b/dashboard/django/collector.py
index c8c3b13..7e43376 100755
--- a/dashboard/django/collector.py
+++ b/dashboard/django/collector.py
@@ -77,10 +77,14 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
clusters = dict( (cluster.name, cluster) for cluster in Cluster.objects.all() )
- db_bundles = []
- db_topics = []
- db_subscriptions = []
- db_consumers = []
+ db_create_bundles = []
+ db_update_bundles = []
+ db_create_topics = []
+ db_update_topics = []
+ db_create_subscriptions = []
+ db_update_subscriptions = []
+ db_create_consumers = []
+ db_update_consumers = []
db_replication = []
for namespace_name, bundles_stats in topics_stats.items():
@@ -94,73 +98,137 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
namespace.save()
for bundle_range, topics_stats in bundles_stats.items():
- bundle = Bundle(
- broker = broker,
- namespace = namespace,
- range = bundle_range,
- cluster = cluster,
- timestamp = timestamp)
- db_bundles.append(bundle)
+ bundle = Bundle.objects.filter(
+ cluster_id=cluster.id,
+ namespace_id=namespace.id,
+ range=bundle_range)
+ if bundle:
+ temp_bundle = bundle.first()
+ temp_bundle.timestame = timestamp
+ db_update_bundles.append(temp_bundle)
+ bundle = temp_bundle
+ else:
+ bundle = Bundle(
+ broker = broker,
+ namespace = namespace,
+ range = bundle_range,
+ cluster = cluster,
+ timestamp = timestamp)
+ db_create_bundles.append(bundle)
+
for topic_name, stats in topics_stats['persistent'].items():
- topic = Topic(
- broker = broker,
- active_broker = active_broker,
- name = topic_name,
- namespace = namespace,
- bundle = bundle,
- cluster = cluster,
- timestamp = timestamp,
- averageMsgSize = stats['averageMsgSize'],
- msgRateIn = stats['msgRateIn'],
- msgRateOut = stats['msgRateOut'],
- msgThroughputIn = stats['msgThroughputIn'],
- msgThroughputOut = stats['msgThroughputOut'],
- pendingAddEntriesCount = stats['pendingAddEntriesCount'],
- producerCount = stats['producerCount'],
- storageSize = stats['storageSize']
- )
-
- db_topics.append(topic)
+ topic = Topic.objects.filter(
+ cluster_id=cluster.id,
+ bundle_id=bundle.id,
+ namespace_id=namespace.id,
+ broker_id=broker.id,
+ name=topic_name)
+ if topic:
+ temp_topic = topic.first()
+ temp_topic.timestamp = timestamp
+ temp_topic.averageMsgSize = stats['averageMsgSize']
+ temp_topic.msgRateIn = stats['msgRateIn']
+ temp_topic.msgRateOut = stats['msgRateOut']
+ temp_topic.msgThroughputIn = stats['msgThroughputIn']
+ temp_topic.msgThroughputOut = stats['msgThroughputOut']
+ temp_topic.pendingAddEntriesCount = stats['pendingAddEntriesCount']
+ temp_topic.producerCount = stats['producerCount']
+ temp_topic.storageSize = stats['storageSize']
+ db_update_topics.append(temp_topic)
+ topic = temp_topic
+ else:
+ topic = Topic(
+ broker = broker,
+ active_broker = active_broker,
+ name = topic_name,
+ namespace = namespace,
+ bundle = bundle,
+ cluster = cluster,
+ timestamp = timestamp,
+ averageMsgSize = stats['averageMsgSize'],
+ msgRateIn = stats['msgRateIn'],
+ msgRateOut = stats['msgRateOut'],
+ msgThroughputIn = stats['msgThroughputIn'],
+ msgThroughputOut = stats['msgThroughputOut'],
+ pendingAddEntriesCount = stats['pendingAddEntriesCount'],
+ producerCount = stats['producerCount'],
+ storageSize = stats['storageSize']
+ )
+ db_create_topics.append(topic)
totalBacklog = 0
numSubscriptions = 0
numConsumers = 0
for subscription_name, subStats in stats['subscriptions'].items():
numSubscriptions += 1
- subscription = Subscription(
- topic = topic,
- name = subscription_name,
- namespace = namespace,
- timestamp = timestamp,
- msgBacklog = subStats['msgBacklog'],
- msgRateExpired = subStats['msgRateExpired'],
- msgRateOut = subStats['msgRateOut'],
- msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
- msgThroughputOut = subStats['msgThroughputOut'],
- subscriptionType = subStats['type'][0],
- unackedMessages = subStats.get('unackedMessages', 0),
- )
- db_subscriptions.append(subscription)
+ subscription = Subscription.objects.filter(
+ topic_id=topic.id,
+ namespace_id=namespace.id,
+ name=subscription_name)
+ if subscription:
+ temp_subscription = subscription.first()
+ temp_subscription.timestamp = timestamp
+ temp_subscription.msgBacklog = subStats['msgBacklog']
+ temp_subscription.msgRateExpired = subStats['msgRateExpired']
+ temp_subscription.msgRateOut = subStats['msgRateOut']
+ temp_subscription.msgRateRedeliver = subStats.get('msgRateRedeliver', 0)
+ temp_subscription.msgThroughputOut = subStats['msgThroughputOut']
+ temp_subscription.subscriptionType = subStats['type'][0]
+ temp_subscription.unackedMessages = subStats.get('unackedMessages', 0)
+ db_update_subscriptions.append(temp_subscription)
+ subscription = temp_subscription
+ else:
+ subscription = Subscription(
+ topic = topic,
+ name = subscription_name,
+ namespace = namespace,
+ timestamp = timestamp,
+ msgBacklog = subStats['msgBacklog'],
+ msgRateExpired = subStats['msgRateExpired'],
+ msgRateOut = subStats['msgRateOut'],
+ msgRateRedeliver = subStats.get('msgRateRedeliver', 0),
+ msgThroughputOut = subStats['msgThroughputOut'],
+ subscriptionType = subStats['type'][0],
+ unackedMessages = subStats.get('unackedMessages', 0),
+ )
+ db_create_subscriptions.append(subscription)
totalBacklog += subStats['msgBacklog']
for consStats in subStats['consumers']:
numConsumers += 1
- consumer = Consumer(
- subscription = subscription,
- timestamp = timestamp,
- address = consStats['address'],
- availablePermits = consStats.get('availablePermits', 0),
- connectedSince = parse_date(consStats.get('connectedSince')),
- consumerName = consStats.get('consumerName'),
- msgRateOut = consStats.get('msgRateOut', 0),
- msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
- msgThroughputOut = consStats.get('msgThroughputOut', 0),
- unackedMessages = consStats.get('unackedMessages', 0),
- blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
- )
- db_consumers.append(consumer)
+ consumer = Consumer.objects.filter(
+ subscription_id=subscription.id,
+ consumerName=consStats.get('consumerName'))
+ if consumer:
+ temp_consumer = consumer.first()
+ temp_consumer.timestamp = timestamp
+ temp_consumer.address = consStats['address']
+ temp_consumer.availablePermits = consStats.get('availablePermits', 0)
+ temp_consumer.connectedSince = parse_date(consStats.get('connectedSince'))
+ temp_consumer.msgRateOut = consStats.get('msgRateOut', 0)
+ temp_consumer.msgRateRedeliver = consStats.get('msgRateRedeliver', 0)
+ temp_consumer.msgThroughputOut = consStats.get('msgThroughputOut', 0)
+ temp_consumer.unackedMessages = consStats.get('unackedMessages', 0)
+ temp_consumer.blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
+ db_update_consumers.append(temp_consumer)
+ consumer = temp_consumer
+ else:
+ consumer = Consumer(
+ subscription = subscription,
+ timestamp = timestamp,
+ address = consStats['address'],
+ availablePermits = consStats.get('availablePermits', 0),
+ connectedSince = parse_date(consStats.get('connectedSince')),
+ consumerName = consStats.get('consumerName'),
+ msgRateOut = consStats.get('msgRateOut', 0),
+ msgRateRedeliver = consStats.get('msgRateRedeliver', 0),
+ msgThroughputOut = consStats.get('msgThroughputOut', 0),
+ unackedMessages = consStats.get('unackedMessages', 0),
+ blockedConsumerOnUnackedMsgs = consStats.get('blockedConsumerOnUnackedMsgs', False)
+ )
+ db_create_consumers.append(consumer)
topic.backlog = totalBacklog
topic.subscriptionCount = numSubscriptions
@@ -210,46 +278,63 @@ def _fetch_broker_stats(cluster, broker_host_port, timestamp):
topic.localThroughputIn = topic.msgThroughputIn - replicationThroughputIn
topic.localThroughputOut = topic.msgThroughputIn - replicationThroughputOut
-
if connection.vendor == 'postgresql':
# Bulk insert into db
- Bundle.objects.bulk_create(db_bundles, batch_size=10000)
+ Bundle.objects.bulk_create(db_create_bundles, batch_size=10000)
# Trick to refresh primary keys after previous bulk import
- for topic in db_topics: topic.bundle = topic.bundle
- Topic.objects.bulk_create(db_topics, batch_size=10000)
+ for topic in db_create_topics: topic.bundle = topic.bundle
+ Topic.objects.bulk_create(db_create_topics, batch_size=10000)
- for subscription in db_subscriptions: subscription.topic = subscription.topic
- Subscription.objects.bulk_create(db_subscriptions, batch_size=10000)
+ for subscription in db_create_subscriptions: subscription.topic = subscription.topic
+ Subscription.objects.bulk_create(db_create_subscriptions, batch_size=10000)
- for consumer in db_consumers: consumer.subscription = consumer.subscription
- Consumer.objects.bulk_create(db_consumers, batch_size=10000)
+ for consumer in db_create_consumers: consumer.subscription = consumer.subscription
+ Consumer.objects.bulk_create(db_create_consumers, batch_size=10000)
for replication in db_replication: replication.topic = replication.topic
Replication.objects.bulk_create(db_replication, batch_size=10000)
- else:
- # For other DB providers we have to insert one by one
- # to be able to retrieve the PK of the newly inserted records
- for bundle in db_bundles:
- bundle.save()
-
- for topic in db_topics:
- topic.bundle = topic.bundle
- topic.save()
+ update_or_create_object(
+ db_update_bundles,
+ db_update_topics,
+ db_update_consumers,
+ db_update_subscriptions)
- for subscription in db_subscriptions:
- subscription.topic = subscription.topic
- subscription.save()
-
- for consumer in db_consumers:
- consumer.subscription = consumer.subscription
- consumer.save()
+ else:
+ update_or_create_object(
+ db_create_bundles,
+ db_create_topics,
+ db_create_consumers,
+ db_create_subscriptions)
+ update_or_create_object(
+ db_update_bundles,
+ db_update_topics,
+ db_update_consumers,
+ db_update_subscriptions)
for replication in db_replication:
replication.topic = replication.topic
replication.save()
+def update_or_create_object(db_bundles, db_topics, db_consumers, db_subscriptions):
+ # For DB providers we have to insert or update one by one
+ # to be able to retrieve the PK of the newly inserted records
+ for bundle in db_bundles:
+ bundle.save()
+
+ for topic in db_topics:
+ topic.bundle = topic.bundle
+ topic.save()
+
+ for subscription in db_subscriptions:
+ subscription.topic = subscription.topic
+ subscription.save()
+
+ for consumer in db_consumers:
+ consumer.subscription = consumer.subscription
+ consumer.save()
+
def fetch_stats():
timestamp = current_milli_time()