You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/20 22:54:00 UTC

[pulsar] branch master updated: optimize py function stats usage (#3229)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c42737  optimize py function stats usage (#3229)
6c42737 is described below

commit 6c427377a2dcc0efc2f8cb239d8b9fdd4cc14bd6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Dec 20 14:53:55 2018 -0800

    optimize py function stats usage (#3229)
---
 .../instance/src/main/python/contextimpl.py        | 23 ++++---
 .../instance/src/main/python/function_stats.py     | 77 +++++++++++++---------
 2 files changed, 58 insertions(+), 42 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index a9b9b1a..2797658 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -58,7 +58,7 @@ class ContextImpl(pulsar.Context):
       else {}
 
     self.metrics_labels = metrics_labels
-    self.user_metrics_labels = dict()
+    self.user_metrics_map = dict()
     self.user_metrics_summary = Summary("pulsar_function_user_metric",
                                     'Pulsar Function user defined metric',
                                         ContextImpl.user_metrics_label_names)
@@ -120,9 +120,12 @@ class ContextImpl(pulsar.Context):
     return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key])
 
   def record_metric(self, metric_name, metric_value):
-    if metric_name not in self.user_metrics_labels:
-      self.user_metrics_labels[metric_name] = self.metrics_labels + [metric_name]
-    self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+    if metric_name not in self.user_metrics_map:
+      user_metrics_labels = self.metrics_labels + [metric_name]
+      self.user_metrics_map[metric_name] = self.user_metrics_summary.labels(*user_metrics_labels)
+
+    self.user_metrics_map[metric_name].observe(metric_value)
+
 
   def get_output_topic(self):
     return self.instance_config.function_details.output
@@ -167,14 +170,14 @@ class ContextImpl(pulsar.Context):
 
   def reset_metrics(self):
     # TODO: Make it thread safe
-    for labels in self.user_metrics_labels.values():
-      self.user_metrics_summary.labels(*labels)._sum.set(0.0)
-      self.user_metrics_summary.labels(*labels)._count.set(0.0)
+    for user_metric in self.user_metrics_map.values():
+      user_metric._sum.set(0.0)
+      user_metric._count.set(0.0)
 
   def get_metrics(self):
     metrics_map = {}
-    for metric_name, metric_labels in self.user_metrics_labels.items():
-      metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._sum.get()
-      metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._count.get()
+    for metric_name, user_metric in self.user_metrics_map.items():
+      metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = user_metric._sum.get()
+      metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = user_metric._count.get()
 
     return metrics_map
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 3ea0316..b9a09c4 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -89,67 +89,80 @@ class Stats(object):
     self.metrics_labels = metrics_labels;
     self.process_start_time = None
 
+    # as optimization
+    self._stat_total_processed_successfully = self.stat_total_processed_successfully.labels(*self.metrics_labels)
+    self._stat_total_sys_exceptions = self.stat_total_sys_exceptions.labels(*self.metrics_labels)
+    self._stat_total_user_exceptions = self.stat_total_user_exceptions.labels(*self.metrics_labels)
+    self._stat_process_latency_ms = self.stat_process_latency_ms.labels(*self.metrics_labels)
+    self._stat_last_invocation = self.stat_last_invocation.labels(*self.metrics_labels)
+    self._stat_total_received = self.stat_total_received.labels(*self.metrics_labels)
+    self._stat_total_processed_successfully_1min = self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)
+    self._stat_total_sys_exceptions_1min = self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)
+    self._stat_total_user_exceptions_1min = self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)
+    self._stat_process_latency_ms_1min = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)
+    self._stat_total_received_1min = self.stat_total_received_1min.labels(*self.metrics_labels)
+
     # start time for windowed metrics
     util.FixedTimer(60, self.reset).start()
 
   def get_total_received(self):
-    return self.stat_total_received.labels(*self.metrics_labels)._value.get();
+    return self._stat_total_received._value.get();
 
   def get_total_processed_successfully(self):
-    return self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
+    return self._stat_total_processed_successfully._value.get();
 
   def get_total_sys_exceptions(self):
-    return self.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get();
+    return self._stat_total_sys_exceptions._value.get();
 
   def get_total_user_exceptions(self):
-    return self.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get();
+    return self._stat_total_user_exceptions._value.get();
 
   def get_avg_process_latency(self):
-    process_latency_ms_count = self.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
-    process_latency_ms_sum = self.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+    process_latency_ms_count = self._stat_process_latency_ms._count.get()
+    process_latency_ms_sum = self._stat_process_latency_ms._sum.get()
     return 0.0 \
       if process_latency_ms_count <= 0.0 \
       else process_latency_ms_sum / process_latency_ms_count
 
   def get_total_processed_successfully_1min(self):
-    return self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
+    return self._stat_total_processed_successfully_1min._value.get()
 
   def get_total_sys_exceptions_1min(self):
-    return self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
+    return self._stat_total_sys_exceptions_1min._value.get()
 
   def get_total_user_exceptions_1min(self):
-    return self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+    return self._stat_total_user_exceptions_1min._value.get()
 
   def get_total_received_1min(self):
-    return self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
+    return self._stat_total_received_1min._value.get()
 
   def get_avg_process_latency_1min(self):
-    process_latency_ms_count = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
-    process_latency_ms_sum = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.get()
+    process_latency_ms_count = self._stat_process_latency_ms_1min._count.get()
+    process_latency_ms_sum = self._stat_process_latency_ms_1min._sum.get()
     return 0.0 \
       if process_latency_ms_count <= 0.0 \
       else process_latency_ms_sum / process_latency_ms_count
 
   def get_last_invocation(self):
-    return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+    return self._stat_last_invocation._value.get()
 
   def incr_total_processed_successfully(self):
-    self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
-    self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
+    self._stat_total_processed_successfully.inc()
+    self._stat_total_processed_successfully_1min.inc()
 
   def incr_total_sys_exceptions(self, exception):
-    self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
-    self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
+    self._stat_total_sys_exceptions.inc()
+    self._stat_total_sys_exceptions_1min.inc()
     self.add_sys_exception(exception)
 
   def incr_total_user_exceptions(self, exception):
-    self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
-    self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
+    self._stat_total_user_exceptions.inc()
+    self._stat_total_user_exceptions_1min.inc()
     self.add_user_exception(exception)
 
   def incr_total_received(self):
-    self.stat_total_received.labels(*self.metrics_labels).inc()
-    self.stat_total_received_1min.labels(*self.metrics_labels).inc()
+    self._stat_total_received.inc()
+    self._stat_total_received_1min.inc()
 
   def process_time_start(self):
     self.process_start_time = time.time();
@@ -157,11 +170,11 @@ class Stats(object):
   def process_time_end(self):
     if self.process_start_time:
       duration = (time.time() - self.process_start_time) * 1000.0
-      self.stat_process_latency_ms.labels(*self.metrics_labels).observe(duration)
-      self.stat_process_latency_ms_1min.labels(*self.metrics_labels).observe(duration)
+      self._stat_process_latency_ms.observe(duration)
+      self._stat_process_latency_ms_1min.observe(duration)
 
   def set_last_invocation(self, time):
-    self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
+    self._stat_last_invocation.set(time * 1000.0)
 
   def add_user_exception(self, exception):
     error = traceback.format_exc()
@@ -178,7 +191,7 @@ class Stats(object):
 
   @limits(calls=5, period=60)
   def report_user_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
     self.user_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def add_sys_exception(self, exception):
@@ -196,15 +209,15 @@ class Stats(object):
 
   @limits(calls=5, period=60)
   def report_system_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
     self.system_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def reset(self):
     self.latest_user_exception = []
     self.latest_sys_exception = []
-    self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
-    self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
-    self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
-    self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.set(0.0)
-    self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.set(0.0)
-    self.stat_total_received_1min.labels(*self.metrics_labels)._value.set(0.0)
\ No newline at end of file
+    self._stat_total_processed_successfully_1min._value.set(0.0)
+    self._stat_total_user_exceptions_1min._value.set(0.0)
+    self._stat_total_sys_exceptions_1min._value.set(0.0)
+    self._stat_process_latency_ms_1min._sum.set(0.0)
+    self._stat_process_latency_ms_1min._count.set(0.0)
+    self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file