You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/10/17 16:07:57 UTC
[skywalking-satellite] branch main updated: fix: errors when
converting meter data from histogram and summary (#75)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 4e833b3 fix: errors when converting meter data from histogram and summary (#75)
4e833b3 is described below
commit 4e833b30ae0cb43db61007a651afa2c9efd44b54
Author: kv <gx...@163.com>
AuthorDate: Mon Oct 18 00:07:51 2021 +0800
fix: errors when converting meter data from histogram and summary (#75)
---
CHANGES.md | 1 +
go.mod | 2 +-
go.sum | 4 +-
plugins/fetcher/prometheus/fetcher_test.go | 101 +++++++++++++++++++++-------
plugins/fetcher/prometheus/metric_family.go | 76 ++++++++++++---------
5 files changed, 127 insertions(+), 57 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index d97c8ac..0907a31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
#### Features
#### Bug Fixes
+* Fix errors when converting meter data from histogram and summary.[#75](https://github.com/apache/skywalking-satellite/pull/75)
#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/93?closed=1)
diff --git a/go.mod b/go.mod
index 0b2fce7..d46bd89 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
- skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718
+ skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270
)
diff --git a/go.sum b/go.sum
index 9b7306a..4a15fcc 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718 h1:uVTsGx5szaNwEDzJKwhOZ0hCnBjuxN26wLgj2NDrklQ=
-skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270 h1:W6bwwu3ejWM/qupBF820tbxrk8s41daqJ1wSd4W3epE=
+skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/fetcher/prometheus/fetcher_test.go b/plugins/fetcher/prometheus/fetcher_test.go
index cd6d6df..6b76e21 100644
--- a/plugins/fetcher/prometheus/fetcher_test.go
+++ b/plugins/fetcher/prometheus/fetcher_test.go
@@ -21,6 +21,7 @@ package prometheus
import (
"context"
"fmt"
+ "math"
"net/http"
"net/http/httptest"
"net/url"
@@ -31,6 +32,8 @@ import (
"testing"
"time"
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/pkg/plugin"
_ "github.com/apache/skywalking-satellite/internal/satellite/test"
@@ -119,7 +122,7 @@ type testData struct {
name string
pages []mockPrometheusResponse
ScrapeConfig *scrapeConfig
- validateFunc func(t *testing.T, em *v1.SniffData)
+ validateFunc func(t *testing.T, em *v1.SniffData, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue)
}
func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, error) {
@@ -211,6 +214,47 @@ rpc_duration_seconds_sum 5002
rpc_duration_seconds_count 1001
`
+var (
+ singleElems = []string{
+ "go_threads",
+ "http_requests_total",
+ "rpc_duration_seconds",
+ "rpc_duration_seconds_count",
+ "rpc_duration_seconds_sum",
+ "http_request_duration_seconds_sum",
+ "http_request_duration_seconds_count",
+ }
+
+ singleValues = map[string][]float64{
+ "go_threads": {19, 18},
+ "http_requests_total": {100, 5, 199, 12},
+ "http_request_duration_seconds_sum": {5000, 5050},
+ "http_request_duration_seconds_count": {2500, 2600},
+ "rpc_duration_seconds": {1, 5, 8, 1, 6, 8},
+ "rpc_duration_seconds_sum": {5000, 5002},
+ "rpc_duration_seconds_count": {1000, 1001},
+ }
+
+ histogramElems = []string{"http_request_duration_seconds"}
+
+ bucketValues = map[string][][]*v3.MeterBucketValue{
+ "http_request_duration_seconds": {
+ {
+ &v3.MeterBucketValue{Bucket: math.Inf(-1), Count: int64(1000)},
+ &v3.MeterBucketValue{Bucket: float64(0.05), Count: int64(1500)},
+ &v3.MeterBucketValue{Bucket: float64(0.5), Count: int64(2000)},
+ &v3.MeterBucketValue{Bucket: float64(1), Count: int64(2500)},
+ },
+ {
+ &v3.MeterBucketValue{Bucket: math.Inf(-1), Count: int64(1100)},
+ &v3.MeterBucketValue{Bucket: float64(0.05), Count: int64(1600)},
+ &v3.MeterBucketValue{Bucket: float64(0.5), Count: int64(2100)},
+ &v3.MeterBucketValue{Bucket: float64(1), Count: int64(2600)},
+ },
+ },
+ }
+)
+
func TestEndToEnd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
@@ -239,46 +283,55 @@ func testEndToEnd(ctx context.Context, t *testing.T, targets []*testData) {
defer mp.Close()
t.Log(cfg)
fetch(ctx, cfg.ScrapeConfigs, outputChannel)
+
+ singleValueCollection := map[string][]float64{}
+ bucketValueCollection := map[string][][]*v3.MeterBucketValue{}
+
+OuterLoop:
for {
select {
case e := <-outputChannel:
- targets[0].validateFunc(t, e)
+ targets[0].validateFunc(t, e, singleValueCollection, bucketValueCollection)
case <-ctx.Done():
- return
+ break OuterLoop
}
}
+ verifyCollection(t, singleValueCollection, bucketValueCollection)
}
-func verifyTarget1(t *testing.T, em *v1.SniffData) {
+func verifyTarget1(t *testing.T, em *v1.SniffData, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue) {
assert.Equal(t, em.GetMeter().Service, "target1", "Get meter service error")
- singleElems := []string{
- "go_threads",
- "http_requests_total",
- "rpc_duration_seconds",
- "rpc_duration_seconds_count",
- "rpc_duration_seconds_sum",
- "http_request_duration_seconds_sum",
- "http_request_duration_seconds_count",
- }
- singleValues := map[string][]float64{
- "go_threads": {19, 18},
- "http_requests_total": {100, 5, 199, 12},
- "http_request_duration_seconds_sum": {5000, 5050},
- "http_request_duration_seconds_count": {2500, 2600},
- "rpc_duration_seconds": {1, 5, 6, 8},
- "rpc_duration_seconds_sum": {5000, 5002},
- "rpc_duration_seconds_count": {1000, 1001},
- }
- histogramElems := []string{"http_request_duration_seconds"}
if em.GetMeter().GetSingleValue() != nil {
single := em.GetMeter().GetSingleValue()
- t.Log(single.GetName())
+ t.Log(single.GetName(), single.GetLabels(), single.GetValue())
assert.Assert(t, is.Contains(singleElems, single.GetName()), "Mismatch single meter name")
assert.Assert(t, is.Contains(singleValues[single.GetName()], single.GetValue()), "Mismatch single meter value")
+ svc[single.GetName()] = append(svc[single.GetName()], single.GetValue())
} else {
histogram := em.GetMeter().GetHistogram()
+ t.Log(histogram.GetName(), histogram.GetLabels(), histogram.GetValues())
assert.Assert(t, is.Contains(histogramElems, histogram.GetName()), "Mismatch histogram meter")
+ bvc[histogram.GetName()] = append(bvc[histogram.GetName()], histogram.GetValues())
+ }
+}
+
+func verifyCollection(t *testing.T, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue) {
+ for k, v := range singleValues {
+ for i, e := range v {
+ assert.Equal(t, svc[k][i], e, fmt.Sprintf("%s collection has errors", k))
+ }
+ t.Log(fmt.Sprintf("%s collection is OK", k))
+ }
+
+ for k, v := range bucketValues {
+ for i, e := range v {
+ for j, f := range e {
+ assert.Equal(t, bvc[k][i][j].Bucket, f.Bucket, fmt.Sprintf("%s collection has errors", k))
+ assert.Equal(t, bvc[k][i][j].Count, f.Count, fmt.Sprintf("%s collection has errors", k))
+ }
+ }
+ t.Log(fmt.Sprintf("%s collection is OK", k))
}
}
diff --git a/plugins/fetcher/prometheus/metric_family.go b/plugins/fetcher/prometheus/metric_family.go
index eaf0c49..da7adb1 100644
--- a/plugins/fetcher/prometheus/metric_family.go
+++ b/plugins/fetcher/prometheus/metric_family.go
@@ -118,11 +118,11 @@ func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v floa
if strings.HasSuffix(metricName, metricsSuffixCount) {
mg.hasCount = true
mg.count = v
- mg.name = metricName
+ mg.name = strings.ReplaceAll(metricName, metricsSuffixCount, "")
} else if strings.HasSuffix(metricName, metricsSuffixSum) {
mg.hasSum = true
mg.sum = v
- mg.name = metricName
+ mg.name = strings.ReplaceAll(metricName, metricsSuffixSum, "")
} else if strings.HasSuffix(metricName, metricsSuffixBucket) {
boundary, err := getBoundary(mf.mtype, ls)
if err != nil {
@@ -135,11 +135,11 @@ func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v floa
if strings.HasSuffix(metricName, metricsSuffixCount) {
mg.hasCount = true
mg.count = v
- mg.name = metricName
+ mg.name = strings.ReplaceAll(metricName, metricsSuffixCount, "")
} else if strings.HasSuffix(metricName, metricsSuffixSum) {
mg.hasSum = true
mg.sum = v
- mg.name = metricName
+ mg.name = strings.ReplaceAll(metricName, metricsSuffixSum, "")
} else {
mg.value = v
mg.name = metricName
@@ -171,26 +171,43 @@ func getBoundary(metricType textparse.MetricType, lbs labels.Labels) (float64, e
return strconv.ParseFloat(v, 64)
}
-func (mf *metricFamily) toMeterSingleValue(mg *metricGroup) *v3.MeterSingleValue {
- if mg.hasCount {
- return &v3.MeterSingleValue{
- Name: mg.name,
- Labels: mf.convertLabels(mg),
- Value: mg.count,
+func (mf *metricFamily) convertSummaryToSingleValue(mg *metricGroup) []*v3.MeterData {
+ result := make([]*v3.MeterData, 0)
+ if mg.hasCount || mg.hasSum {
+ if mg.hasCount {
+ msv := &v3.MeterSingleValue{
+ Name: mg.name + metricsSuffixCount,
+ Labels: mf.convertLabels(mg),
+ Value: mg.count,
+ }
+ result = append(result, &v3.MeterData{
+ Metric: &v3.MeterData_SingleValue{SingleValue: msv},
+ Timestamp: mg.ts,
+ })
}
- }
- if mg.hasSum {
- return &v3.MeterSingleValue{
+ if mg.hasSum {
+ msv := &v3.MeterSingleValue{
+ Name: mg.name + metricsSuffixSum,
+ Labels: mf.convertLabels(mg),
+ Value: mg.sum,
+ }
+ result = append(result, &v3.MeterData{
+ Metric: &v3.MeterData_SingleValue{SingleValue: msv},
+ Timestamp: mg.ts,
+ })
+ }
+ } else {
+ msv := &v3.MeterSingleValue{
Name: mg.name,
Labels: mf.convertLabels(mg),
- Value: mg.sum,
+ Value: mg.value,
}
+ result = append(result, &v3.MeterData{
+ Metric: &v3.MeterData_SingleValue{SingleValue: msv},
+ Timestamp: mg.ts,
+ })
}
- return &v3.MeterSingleValue{
- Name: mg.name,
- Labels: mf.convertLabels(mg),
- Value: mg.value,
- }
+ return result
}
func (mf *metricFamily) ToMetric() []*v3.MeterData {
@@ -198,17 +215,13 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
switch mf.mtype {
case textparse.MetricTypeSummary:
for _, mg := range mf.getGroups() {
- msv := mf.toMeterSingleValue(mg)
- result = append(result, &v3.MeterData{
- Metric: &v3.MeterData_SingleValue{SingleValue: msv},
- Timestamp: mg.ts,
- })
+ result = append(result, mf.convertSummaryToSingleValue(mg)...)
}
case textparse.MetricTypeHistogram:
for _, mg := range mf.getGroups() {
if mg.hasCount {
msv := &v3.MeterSingleValue{
- Name: mg.name,
+ Name: mg.name + metricsSuffixCount,
Labels: mf.convertLabels(mg),
Value: mg.count,
}
@@ -216,11 +229,10 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
Metric: &v3.MeterData_SingleValue{SingleValue: msv},
Timestamp: mg.ts,
})
- continue
}
if mg.hasSum {
msv := &v3.MeterSingleValue{
- Name: mg.name,
+ Name: mg.name + metricsSuffixSum,
Labels: mf.convertLabels(mg),
Value: mg.sum,
}
@@ -228,7 +240,6 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
Metric: &v3.MeterData_SingleValue{SingleValue: msv},
Timestamp: mg.ts,
})
- continue
}
bucketMap := make(map[float64]float64)
@@ -238,12 +249,14 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
sort.Slice(mg.complexValue, func(i, j int) bool {
return mg.complexValue[i].boundary < mg.complexValue[j].boundary
})
+
mbs := make([]*v3.MeterBucketValue, 0)
for index, m := range mg.complexValue {
if index == 0 {
mbv := &v3.MeterBucketValue{
- Bucket: float64(math.MinInt64),
- Count: int64(m.value),
+ Bucket: math.Inf(-1),
+ Count: int64(m.value),
+ IsNegativeInfinity: true,
}
mbs = append(mbs, mbv)
} else {
@@ -286,6 +299,9 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
func (mf *metricFamily) convertLabels(mg *metricGroup) []*v3.Label {
result := make([]*v3.Label, 0)
for k, v := range mg.ls.Map() {
+ if !isUsefulLabel(mf.mtype, k) {
+ continue
+ }
label := &v3.Label{
Name: k,
Value: v,