You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 10:41:13 UTC

[pulsar] 04/25: [Issue 7489] Remove timestamp from metrics (#7539)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 602735e8496ea93a68553dccdd876425fa42dd3a
Author: Sergii Zhevzhyk <vz...@users.noreply.github.com>
AuthorDate: Sat Jul 25 16:33:08 2020 +0200

    [Issue 7489] Remove timestamp from metrics (#7539)
    
    * [Issue 7489] Remove timestamp from exception metrics for functions and connectors
    
    * [Issue 7489] Remove timestamp from exception metrics for Go functions
    
    * [Issue 7489] Remove unused import in go stats
    
    * [Issue 7489] Remove timestamp from metrics in python stats
    
    * Change to v2 of go github actions
    
    * Update go github actions
    
    * Remove the version from the go test command
    
    * Rename github jobs for go
    
    Co-authored-by: Matteo Merli <mm...@splunk.com>
    (cherry picked from commit 777ed1651221449da61c3e3cb8a7eb8157a8239f)
---
 .github/workflows/ci-go-functions-style.yaml             |  8 +++++---
 .github/workflows/ci-go-functions-test.yaml              |  6 +++---
 pulsar-function-go/pf/stats.go                           | 15 +++++++--------
 .../functions/instance/stats/ComponentStatsManager.java  |  3 +--
 .../functions/instance/stats/FunctionStatsManager.java   | 11 +++++------
 .../functions/instance/stats/SinkStatsManager.java       | 11 +++++------
 .../functions/instance/stats/SourceStatsManager.java     | 11 +++++------
 .../instance/src/main/python/function_stats.py           | 16 ++++++++--------
 8 files changed, 39 insertions(+), 42 deletions(-)

diff --git a/.github/workflows/ci-go-functions-style.yaml b/.github/workflows/ci-go-functions-style.yaml
index 628e437..bea77c7 100644
--- a/.github/workflows/ci-go-functions-style.yaml
+++ b/.github/workflows/ci-go-functions-style.yaml
@@ -27,12 +27,14 @@ on:
       - 'pulsar-function-go/**'
 
 jobs:
-  build:
-    name: Build
+  check-style:
+
+    name: Go ${{ matrix.go-version }} Functions style check
     runs-on: ubuntu-latest
     strategy:
       matrix:
         go-version: [1.11, 1.12, 1.13, 1.14]
+
     steps:
       - name: Check out code into the Go module directory
         uses: actions/checkout@v2
@@ -47,7 +49,7 @@ jobs:
           args: site2 .github deployment .asf.yaml .ci ct.yaml
 
       - name: Set up Go
-        uses: actions/setup-go@v1
+        uses: actions/setup-go@v2
         if: steps.docs.outputs.changed_only == 'no'
         with:
           go-version: ${{ matrix.go-version }}
diff --git a/.github/workflows/ci-go-functions-test.yaml b/.github/workflows/ci-go-functions-test.yaml
index 5b6f942..8fb7d95 100644
--- a/.github/workflows/ci-go-functions-test.yaml
+++ b/.github/workflows/ci-go-functions-test.yaml
@@ -37,7 +37,7 @@ jobs:
     timeout-minutes: 120
 
     steps:
-      - name: checkout
+      - name: Check out code into the Go module directory
         uses: actions/checkout@v2
         with:
           fetch-depth: 0
@@ -50,13 +50,13 @@ jobs:
           args: site2 .github deployment .asf.yaml .ci ct.yaml
 
       - name: Set up Go
-        uses: actions/setup-go@v1
+        uses: actions/setup-go@v2
         if: steps.docs.outputs.changed_only == 'no'
         with:
           go-version: ${{ matrix.go-version }}
         id: go
 
-      - name: run tests
+      - name: Run tests
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           cd pulsar-function-go
diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go
index 2d8f15b..f7952fb 100644
--- a/pulsar-function-go/pf/stats.go
+++ b/pulsar-function-go/pf/stats.go
@@ -20,7 +20,6 @@
 package pf
 
 import (
-	"strconv"
 	"time"
 
 	"github.com/prometheus/client_golang/prometheus"
@@ -30,7 +29,7 @@ import (
 
 var (
 	metricsLabelNames          = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
-	exceptionLabelNames        = []string{"error", "ts"}
+	exceptionLabelNames        = []string{"error"}
 	exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
 )
 
@@ -254,12 +253,12 @@ func (stat *StatWithLabelValues) addUserException(err error) {
 		stat.latestUserException = stat.latestUserException[1:]
 	}
 	// report exception via prometheus
-	stat.reportUserExceptionPrometheus(err, ts)
+	stat.reportUserExceptionPrometheus(err)
 }
 
 //@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) {
-	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error) {
+	errorTs := []string{exception.Error()}
 	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
 	userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
 }
@@ -284,12 +283,12 @@ func (stat *StatWithLabelValues) addSysException(exception error) {
 		stat.latestSysException = stat.latestSysException[1:]
 	}
 	// report exception via prometheus
-	stat.reportSystemExceptionPrometheus(exception, ts)
+	stat.reportSystemExceptionPrometheus(exception)
 }
 
 //@limits(calls=5, period=60)
-func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) {
-	errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
+func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error) {
+	errorTs := []string{exception.Error()}
 	exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
 	systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index daa51b7..cbdcc0f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -49,9 +49,8 @@ public abstract class ComponentStatsManager implements AutoCloseable {
     protected static final String[] exceptionMetricsLabelNames;
 
     static {
-        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2);
+        exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 1);
         exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
-        exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
     }
 
     public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
index fdedb74..f02b850 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java
@@ -243,7 +243,7 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (userExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             userExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -255,15 +255,14 @@ public class FunctionStatsManager extends ComponentStatsManager{
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
index c913225..401aa34 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java
@@ -213,7 +213,7 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -236,15 +236,14 @@ public class SinkStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sinkExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
index 0ec7352..287240c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java
@@ -212,7 +212,7 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sysExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sysExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
@@ -230,15 +230,14 @@ public class SourceStatsManager extends ComponentStatsManager {
 
         // report exception throw prometheus
         if (sourceExceptionRateLimiter.tryAcquire()) {
-            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
+            String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
             sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
         }
     }
 
-    private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
-        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
-        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
+    private String[] getExceptionMetricsLabels(Throwable ex) {
+        String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
+        exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
         return exceptionMetricsLabels;
     }
 
diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py
index 63089b6..dd236c5 100644
--- a/pulsar-functions/instance/src/main/python/function_stats.py
+++ b/pulsar-functions/instance/src/main/python/function_stats.py
@@ -29,7 +29,7 @@ from ratelimit import limits, RateLimitException
 class Stats(object):
   metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn']
 
-  exception_metrics_label_names = metrics_label_names + ['error', 'ts']
+  exception_metrics_label_names = metrics_label_names + ['error']
 
   PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
   USER_METRIC_PREFIX = "user_metric_";
@@ -185,13 +185,13 @@ class Stats(object):
 
     # report exception via prometheus
     try:
-      self.report_user_exception_prometheus(exception, ts)
+      self.report_user_exception_prometheus(exception)
     except RateLimitException:
       pass
 
   @limits(calls=5, period=60)
-  def report_user_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+  def report_user_exception_prometheus(self, exception):
+    exception_metric_labels = self.metrics_labels + [str(exception)]
     self.user_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def add_sys_exception(self, exception):
@@ -203,13 +203,13 @@ class Stats(object):
 
     # report exception via prometheus
     try:
-      self.report_system_exception_prometheus(exception, ts)
+      self.report_system_exception_prometheus(exception)
     except RateLimitException:
       pass
 
   @limits(calls=5, period=60)
-  def report_system_exception_prometheus(self, exception, ts):
-    exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
+  def report_system_exception_prometheus(self, exception):
+    exception_metric_labels = self.metrics_labels + [str(exception)]
     self.system_exceptions.labels(*exception_metric_labels).set(1.0)
 
   def reset(self):
@@ -218,4 +218,4 @@ class Stats(object):
     self._stat_total_sys_exceptions_1min._value.set(0.0)
     self._stat_process_latency_ms_1min._sum.set(0.0)
     self._stat_process_latency_ms_1min._count.set(0.0)
-    self._stat_total_received_1min._value.set(0.0)
\ No newline at end of file
+    self._stat_total_received_1min._value.set(0.0)