You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:13 UTC
[pulsar] 04/25: [Issue 7489] Remove timestamp from metrics (#7539)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 602735e8496ea93a68553dccdd876425fa42dd3a
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Sat Jul 25 16:33:08 2020 +0200
[Issue 7489] Remove timestamp from metrics (#7539)
* [Issue 7489] Remove timestamp from exception metrics for functions and connectors
* [Issue 7489] Remove timestamp from exception metrics for Go functions
* [Issue 7489] Remove unused import in go stats
* [Issue 7489] Remove timestamp from metrics in python stats
* Change to v2 of go github actions
* Update go github actions
* Remove the version from the go test command
* Rename github jobs for go
Co-authored-by: Matteo Merli <mm...@splunk.com>
(cherry picked from commit 777ed1651221449da61c3e3cb8a7eb8157a8239f)
---
.github/workflows/ci-go-functions-style.yaml | 8 +++++---
.github/workflows/ci-go-functions-test.yaml | 6 +++---
pulsar-function-go/pf/stats.go | 15 +++++++--------
.../functions/instance/stats/ComponentStatsManager.java | 3 +--
.../functions/instance/stats/FunctionStatsManager.java | 11 +++++------
.../functions/instance/stats/SinkStatsManager.java | 11 +++++------
.../functions/instance/stats/SourceStatsManager.java | 11 +++++------
.../instance/src/main/python/function_stats.py | 16 ++++++++--------
8 files changed, 39 insertions(+), 42 deletions(-)
diff --git a/.github/workflows/ci-go-functions-style.yaml b/.github/workflows/ci-go-functions-style.yaml
index 628e437..bea77c7 100644
--- a/.github/workflows/ci-go-functions-style.yaml
+++ b/.github/workflows/ci-go-functions-style.yaml
@@ -27,12 +27,14 @@ on:
- 'pulsar-function-go/**'
jobs:
- build:
- name: Build
+ check-style:
+
+ name: Go ${{ matrix.go-version }} Functions style check
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.11, 1.12, 1.13, 1.14]
+
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
@@ -47,7 +49,7 @@ jobs:
args: site2 .github deployment .asf.yaml .ci ct.yaml
- name: Set up Go
- uses: actions/setup-go@v1
+ uses: actions/setup-go@v2
if: steps.docs.outputs.changed_only == 'no'
with:
go-version: ${{ matrix.go-version }}
diff --git a/.github/workflows/ci-go-functions-test.yaml b/.github/workflows/ci-go-functions-test.yaml
index 5b6f942..8fb7d95 100644
--- a/.github/workflows/ci-go-functions-test.yaml
+++ b/.github/workflows/ci-go-functions-test.yaml
@@ -37,7 +37,7 @@ jobs:
timeout-minutes: 120
steps:
- - name: checkout
+ - name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 0
@@ -50,13 +50,13 @@ jobs:
args: site2 .github deployment .asf.yaml .ci ct.yaml
- name: Set up Go
- uses: actions/setup-go@v1
+ uses: actions/setup-go@v2
if: steps.docs.outputs.changed_only == 'no'
with:
go-version: ${{ matrix.go-version }}
id: go
- - name: run tests
+ - name: Run tests
if: steps.docs.outputs.changed_only == 'no'
run: |
cd pulsar-function-go
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 2d8f15b..f7952fb 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -20,7 +20,6 @@
package pf
import (
- "strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
@@ -30,7 +29,7 @@ import (
var (
metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
- exceptionLabelNames = []string{"error", "ts"}
+ exceptionLabelNames = []string{"error"}
exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
)
@@ -254,12 +253,12 @@ func (stat *StatWithLabelValues) addUserException(err error) {
stat.latestUserException = stat.latestUserException[1:]
}
// report exception via prometheus
- stat.reportUserExceptionPrometheus(err, ts)
+ stat.reportUserExceptionPrometheus(err)
}
//@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) {
- errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error) {
+ errorTs := []string{exception.Error()}
exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
}
@@ -284,12 +283,12 @@ func (stat *StatWithLabelValues) addSysException(exception error) {
stat.latestSysException = stat.latestSysException[1:]
}
// report exception via prometheus
- stat.reportSystemExceptionPrometheus(exception, ts)
+ stat.reportSystemExceptionPrometheus(exception)
}
//@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) {
- errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error) {
+ errorTs := []string{exception.Error()}
exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index daa51b7..cbdcc0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -49,9 +49,8 @@ public abstract class ComponentStatsManager implements AutoCloseable {
protected static final String[] exceptionMetricsLabelNames;
static {
- exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2);
+ exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 1);
exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
- exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
}
public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index fdedb74..f02b850 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -243,7 +243,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
// report exception throw prometheus
if (userExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
userExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -255,15 +255,14 @@ public class FunctionStatsManager extends ComponentStatsManager{
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
- private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ private String[] getExceptionMetricsLabels(Throwable ex) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index c913225..401aa34 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -213,7 +213,7 @@ public class SinkStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -236,15 +236,14 @@ public class SinkStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sinkExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
- private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ private String[] getExceptionMetricsLabels(Throwable ex) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 0ec7352..287240c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -212,7 +212,7 @@ public class SourceStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
@@ -230,15 +230,14 @@ public class SourceStatsManager extends ComponentStatsManager {
// report exception throw prometheus
if (sourceExceptionRateLimiter.tryAcquire()) {
- String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+ String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
- private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
- String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
- exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
- exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+ private String[] getExceptionMetricsLabels(Throwable ex) {
+ String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+ exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 63089b6..dd236c5 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -29,7 +29,7 @@ from ratelimit import limits, RateLimitException
class Stats(object):
metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn']
- exception_metrics_label_names = metrics_label_names + ['error', 'ts']
+ exception_metrics_label_names = metrics_label_names + ['error']
PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
USER_METRIC_PREFIX = "user_metric_";
@@ -185,13 +185,13 @@ class Stats(object):
# report exception via prometheus
try:
- self.report_user_exception_prometheus(exception, ts)
+ self.report_user_exception_prometheus(exception)
except RateLimitException:
pass
@limits(calls=5, period=60)
- def report_user_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+ def report_user_exception_prometheus(self, exception):
+ exception_metric_labels = self.metrics_labels + [str(exception)]
self.user_exceptions.labels(*exception_metric_labels).set(1.0)
def add_sys_exception(self, exception):
@@ -203,13 +203,13 @@ class Stats(object):
# report exception via prometheus
try:
- self.report_system_exception_prometheus(exception, ts)
+ self.report_system_exception_prometheus(exception)
except RateLimitException:
pass
@limits(calls=5, period=60)
- def report_system_exception_prometheus(self, exception, ts):
- exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+ def report_system_exception_prometheus(self, exception):
+ exception_metric_labels = self.metrics_labels + [str(exception)]
self.system_exceptions.labels(*exception_metric_labels).set(1.0)
def reset(self):
@@ -218,4 +218,4 @@ class Stats(object):
self._stat_total_sys_exceptions_1min._value.set(0.0)
self._stat_process_latency_ms_1min._sum.set(0.0)
self._stat_process_latency_ms_1min._count.set(0.0)
- self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file
+ self._stat_total_received_1min._value.set(0.0)