You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/20 22:54:00 UTC
[pulsar] branch master updated: optimize py function stats usage
(#3229)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6c42737 optimize py function stats usage (#3229)
6c42737 is described below
commit 6c427377a2dcc0efc2f8cb239d8b9fdd4cc14bd6
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Dec 20 14:53:55 2018 -0800
optimize py function stats usage (#3229)
---
.../instance/src/main/python/contextimpl.py | 23 ++++---
.../instance/src/main/python/function_stats.py | 77 +++++++++++++---------
2 files changed, 58 insertions(+), 42 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index a9b9b1a..2797658 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -58,7 +58,7 @@ class ContextImpl(pulsar.Context):
else {}
self.metrics_labels = metrics_labels
- self.user_metrics_labels = dict()
+ self.user_metrics_map = dict()
self.user_metrics_summary = Summary("pulsar_function_user_metric",
'Pulsar Function user defined metric',
ContextImpl.user_metrics_label_names)
@@ -120,9 +120,12 @@ class ContextImpl(pulsar.Context):
return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key])
def record_metric(self, metric_name, metric_value):
- if metric_name not in self.user_metrics_labels:
- self.user_metrics_labels[metric_name] = self.metrics_labels + [metric_name]
- self.user_metrics_summary.labels(*self.user_metrics_labels[metric_name]).observe(metric_value)
+ if metric_name not in self.user_metrics_map:
+ user_metrics_labels = self.metrics_labels + [metric_name]
+ self.user_metrics_map[metric_name] = self.user_metrics_summary.labels(*user_metrics_labels)
+
+ self.user_metrics_map[metric_name].observe(metric_value)
+
def get_output_topic(self):
return self.instance_config.function_details.output
@@ -167,14 +170,14 @@ class ContextImpl(pulsar.Context):
def reset_metrics(self):
# TODO: Make it thread safe
- for labels in self.user_metrics_labels.values():
- self.user_metrics_summary.labels(*labels)._sum.set(0.0)
- self.user_metrics_summary.labels(*labels)._count.set(0.0)
+ for user_metric in self.user_metrics_map.values():
+ user_metric._sum.set(0.0)
+ user_metric._count.set(0.0)
def get_metrics(self):
metrics_map = {}
- for metric_name, metric_labels in self.user_metrics_labels.items():
- metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._sum.get()
- metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = self.user_metrics_summary.labels(*metric_labels)._count.get()
+ for metric_name, user_metric in self.user_metrics_map.items():
+ metrics_map["%s%s_sum" % (Stats.USER_METRIC_PREFIX, metric_name)] = user_metric._sum.get()
+ metrics_map["%s%s_count" % (Stats.USER_METRIC_PREFIX, metric_name)] = user_metric._count.get()
return metrics_map
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 3ea0316..b9a09c4 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -89,67 +89,80 @@ class Stats(object):
self.metrics_labels = metrics_labels;
self.process_start_time = None
+ # as optimization
+ self._stat_total_processed_successfully = self.stat_total_processed_successfully.labels(*self.metrics_labels)
+ self._stat_total_sys_exceptions = self.stat_total_sys_exceptions.labels(*self.metrics_labels)
+ self._stat_total_user_exceptions = self.stat_total_user_exceptions.labels(*self.metrics_labels)
+ self._stat_process_latency_ms = self.stat_process_latency_ms.labels(*self.metrics_labels)
+ self._stat_last_invocation = self.stat_last_invocation.labels(*self.metrics_labels)
+ self._stat_total_received = self.stat_total_received.labels(*self.metrics_labels)
+ self._stat_total_processed_successfully_1min = self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)
+ self._stat_total_sys_exceptions_1min = self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)
+ self._stat_total_user_exceptions_1min = self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)
+ self._stat_process_latency_ms_1min = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)
+ self._stat_total_received_1min = self.stat_total_received_1min.labels(*self.metrics_labels)
+
# start time for windowed metrics
util.FixedTimer(60, self.reset).start()
def get_total_received(self):
- return self.stat_total_received.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_received._value.get();
def get_total_processed_successfully(self):
- return self.stat_total_processed_successfully.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_processed_successfully._value.get();
def get_total_sys_exceptions(self):
- return self.stat_total_sys_exceptions.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_sys_exceptions._value.get();
def get_total_user_exceptions(self):
- return self.stat_total_user_exceptions.labels(*self.metrics_labels)._value.get();
+ return self._stat_total_user_exceptions._value.get();
def get_avg_process_latency(self):
- process_latency_ms_count = self.stat_process_latency_ms.labels(*self.metrics_labels)._count.get()
- process_latency_ms_sum = self.stat_process_latency_ms.labels(*self.metrics_labels)._sum.get()
+ process_latency_ms_count = self._stat_process_latency_ms._count.get()
+ process_latency_ms_sum = self._stat_process_latency_ms._sum.get()
return 0.0 \
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
def get_total_processed_successfully_1min(self):
- return self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_processed_successfully_1min._value.get()
def get_total_sys_exceptions_1min(self):
- return self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_sys_exceptions_1min._value.get()
def get_total_user_exceptions_1min(self):
- return self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_user_exceptions_1min._value.get()
def get_total_received_1min(self):
- return self.stat_total_received_1min.labels(*self.metrics_labels)._value.get()
+ return self._stat_total_received_1min._value.get()
def get_avg_process_latency_1min(self):
- process_latency_ms_count = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.get()
- process_latency_ms_sum = self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.get()
+ process_latency_ms_count = self._stat_process_latency_ms_1min._count.get()
+ process_latency_ms_sum = self._stat_process_latency_ms_1min._sum.get()
return 0.0 \
if process_latency_ms_count <= 0.0 \
else process_latency_ms_sum / process_latency_ms_count
def get_last_invocation(self):
- return self.stat_last_invocation.labels(*self.metrics_labels)._value.get()
+ return self._stat_last_invocation._value.get()
def incr_total_processed_successfully(self):
- self.stat_total_processed_successfully.labels(*self.metrics_labels).inc()
- self.stat_total_processed_successfully_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_processed_successfully.inc()
+ self._stat_total_processed_successfully_1min.inc()
def incr_total_sys_exceptions(self, exception):
- self.stat_total_sys_exceptions.labels(*self.metrics_labels).inc()
- self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_sys_exceptions.inc()
+ self._stat_total_sys_exceptions_1min.inc()
self.add_sys_exception(exception)
def incr_total_user_exceptions(self, exception):
- self.stat_total_user_exceptions.labels(*self.metrics_labels).inc()
- self.stat_total_user_exceptions_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_user_exceptions.inc()
+ self._stat_total_user_exceptions_1min.inc()
self.add_user_exception(exception)
def incr_total_received(self):
- self.stat_total_received.labels(*self.metrics_labels).inc()
- self.stat_total_received_1min.labels(*self.metrics_labels).inc()
+ self._stat_total_received.inc()
+ self._stat_total_received_1min.inc()
def process_time_start(self):
self.process_start_time = time.time();
@@ -157,11 +170,11 @@ class Stats(object):
def process_time_end(self):
if self.process_start_time:
duration = (time.time() - self.process_start_time) * 1000.0
- self.stat_process_latency_ms.labels(*self.metrics_labels).observe(duration)
- self.stat_process_latency_ms_1min.labels(*self.metrics_labels).observe(duration)
+ self._stat_process_latency_ms.observe(duration)
+ self._stat_process_latency_ms_1min.observe(duration)
def set_last_invocation(self, time):
- self.stat_last_invocation.labels(*self.metrics_labels).set(time * 1000.0)
+ self._stat_last_invocation.set(time * 1000.0)
def add_user_exception(self, exception):
error = traceback.format_exc()
@@ -178,7 +191,7 @@ class Stats(object):
@limits(calls=5, period=60)
def report_user_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+ exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
self.user_exceptions.labels(*exception_metric_labels).set(1.0)
def add_sys_exception(self, exception):
@@ -196,15 +209,15 @@ class Stats(object):
@limits(calls=5, period=60)
def report_system_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [exception.message, str(ts)]
+ exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
self.system_exceptions.labels(*exception_metric_labels).set(1.0)
def reset(self):
self.latest_user_exception = []
self.latest_sys_exception = []
- self.stat_total_processed_successfully_1min.labels(*self.metrics_labels)._value.set(0.0)
- self.stat_total_user_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
- self.stat_total_sys_exceptions_1min.labels(*self.metrics_labels)._value.set(0.0)
- self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._sum.set(0.0)
- self.stat_process_latency_ms_1min.labels(*self.metrics_labels)._count.set(0.0)
- self.stat_total_received_1min.labels(*self.metrics_labels)._value.set(0.0)
\ No newline at end of file
+ self._stat_total_processed_successfully_1min._value.set(0.0)
+ self._stat_total_user_exceptions_1min._value.set(0.0)
+ self._stat_total_sys_exceptions_1min._value.set(0.0)
+ self._stat_process_latency_ms_1min._sum.set(0.0)
+ self._stat_process_latency_ms_1min._count.set(0.0)
+ self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file