You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/10/17 16:07:57 UTC

[skywalking-satellite] branch main updated: fix: errors when converting meter data from histogram and summary (#75)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e833b3  fix: errors when converting meter data from histogram and summary (#75)
4e833b3 is described below

commit 4e833b30ae0cb43db61007a651afa2c9efd44b54
Author: kv <gx...@163.com>
AuthorDate: Mon Oct 18 00:07:51 2021 +0800

    fix: errors when converting meter data from histogram and summary (#75)
---
 CHANGES.md                                  |   1 +
 go.mod                                      |   2 +-
 go.sum                                      |   4 +-
 plugins/fetcher/prometheus/fetcher_test.go  | 101 +++++++++++++++++++++-------
 plugins/fetcher/prometheus/metric_family.go |  76 ++++++++++++---------
 5 files changed, 127 insertions(+), 57 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d97c8ac..0907a31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
 #### Features
 
 #### Bug Fixes
+* Fix errors when converting meter data from histogram and summary.[#75](https://github.com/apache/skywalking-satellite/pull/75)
 
 #### Issues and PR
 - All issues are [here](https://github.com/apache/skywalking/milestone/93?closed=1)
diff --git a/go.mod b/go.mod
index 0b2fce7..d46bd89 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 	gotest.tools v2.2.0+incompatible
-	skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718
+	skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270
 )
diff --git a/go.sum b/go.sum
index 9b7306a..4a15fcc 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718 h1:uVTsGx5szaNwEDzJKwhOZ0hCnBjuxN26wLgj2NDrklQ=
-skywalking.apache.org/repo/goapi v0.0.0-20211013044926-15f732fe3718/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270 h1:W6bwwu3ejWM/qupBF820tbxrk8s41daqJ1wSd4W3epE=
+skywalking.apache.org/repo/goapi v0.0.0-20211014145040-b215a7f7b270/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/fetcher/prometheus/fetcher_test.go b/plugins/fetcher/prometheus/fetcher_test.go
index cd6d6df..6b76e21 100644
--- a/plugins/fetcher/prometheus/fetcher_test.go
+++ b/plugins/fetcher/prometheus/fetcher_test.go
@@ -21,6 +21,7 @@ package prometheus
 import (
 	"context"
 	"fmt"
+	"math"
 	"net/http"
 	"net/http/httptest"
 	"net/url"
@@ -31,6 +32,8 @@ import (
 	"testing"
 	"time"
 
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
 	_ "github.com/apache/skywalking-satellite/internal/satellite/test"
@@ -119,7 +122,7 @@ type testData struct {
 	name         string
 	pages        []mockPrometheusResponse
 	ScrapeConfig *scrapeConfig
-	validateFunc func(t *testing.T, em *v1.SniffData)
+	validateFunc func(t *testing.T, em *v1.SniffData, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue)
 }
 
 func setupMockPrometheus(tds ...*testData) (*mockPrometheus, *promcfg.Config, error) {
@@ -211,6 +214,47 @@ rpc_duration_seconds_sum 5002
 rpc_duration_seconds_count 1001
 `
 
+var (
+	singleElems = []string{
+		"go_threads",
+		"http_requests_total",
+		"rpc_duration_seconds",
+		"rpc_duration_seconds_count",
+		"rpc_duration_seconds_sum",
+		"http_request_duration_seconds_sum",
+		"http_request_duration_seconds_count",
+	}
+
+	singleValues = map[string][]float64{
+		"go_threads":                          {19, 18},
+		"http_requests_total":                 {100, 5, 199, 12},
+		"http_request_duration_seconds_sum":   {5000, 5050},
+		"http_request_duration_seconds_count": {2500, 2600},
+		"rpc_duration_seconds":                {1, 5, 8, 1, 6, 8},
+		"rpc_duration_seconds_sum":            {5000, 5002},
+		"rpc_duration_seconds_count":          {1000, 1001},
+	}
+
+	histogramElems = []string{"http_request_duration_seconds"}
+
+	bucketValues = map[string][][]*v3.MeterBucketValue{
+		"http_request_duration_seconds": {
+			{
+				&v3.MeterBucketValue{Bucket: math.Inf(-1), Count: int64(1000)},
+				&v3.MeterBucketValue{Bucket: float64(0.05), Count: int64(1500)},
+				&v3.MeterBucketValue{Bucket: float64(0.5), Count: int64(2000)},
+				&v3.MeterBucketValue{Bucket: float64(1), Count: int64(2500)},
+			},
+			{
+				&v3.MeterBucketValue{Bucket: math.Inf(-1), Count: int64(1100)},
+				&v3.MeterBucketValue{Bucket: float64(0.05), Count: int64(1600)},
+				&v3.MeterBucketValue{Bucket: float64(0.5), Count: int64(2100)},
+				&v3.MeterBucketValue{Bucket: float64(1), Count: int64(2600)},
+			},
+		},
+	}
+)
+
 func TestEndToEnd(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
 	defer cancel()
@@ -239,46 +283,55 @@ func testEndToEnd(ctx context.Context, t *testing.T, targets []*testData) {
 	defer mp.Close()
 	t.Log(cfg)
 	fetch(ctx, cfg.ScrapeConfigs, outputChannel)
+
+	singleValueCollection := map[string][]float64{}
+	bucketValueCollection := map[string][][]*v3.MeterBucketValue{}
+
+OuterLoop:
 	for {
 		select {
 		case e := <-outputChannel:
-			targets[0].validateFunc(t, e)
+			targets[0].validateFunc(t, e, singleValueCollection, bucketValueCollection)
 		case <-ctx.Done():
-			return
+			break OuterLoop
 		}
 	}
+	verifyCollection(t, singleValueCollection, bucketValueCollection)
 }
 
-func verifyTarget1(t *testing.T, em *v1.SniffData) {
+func verifyTarget1(t *testing.T, em *v1.SniffData, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue) {
 	assert.Equal(t, em.GetMeter().Service, "target1", "Get meter service error")
 
-	singleElems := []string{
-		"go_threads",
-		"http_requests_total",
-		"rpc_duration_seconds",
-		"rpc_duration_seconds_count",
-		"rpc_duration_seconds_sum",
-		"http_request_duration_seconds_sum",
-		"http_request_duration_seconds_count",
-	}
-	singleValues := map[string][]float64{
-		"go_threads":                          {19, 18},
-		"http_requests_total":                 {100, 5, 199, 12},
-		"http_request_duration_seconds_sum":   {5000, 5050},
-		"http_request_duration_seconds_count": {2500, 2600},
-		"rpc_duration_seconds":                {1, 5, 6, 8},
-		"rpc_duration_seconds_sum":            {5000, 5002},
-		"rpc_duration_seconds_count":          {1000, 1001},
-	}
-	histogramElems := []string{"http_request_duration_seconds"}
 	if em.GetMeter().GetSingleValue() != nil {
 		single := em.GetMeter().GetSingleValue()
-		t.Log(single.GetName())
+		t.Log(single.GetName(), single.GetLabels(), single.GetValue())
 		assert.Assert(t, is.Contains(singleElems, single.GetName()), "Mismatch single meter name")
 		assert.Assert(t, is.Contains(singleValues[single.GetName()], single.GetValue()), "Mismatch single meter value")
+		svc[single.GetName()] = append(svc[single.GetName()], single.GetValue())
 	} else {
 		histogram := em.GetMeter().GetHistogram()
+		t.Log(histogram.GetName(), histogram.GetLabels(), histogram.GetValues())
 		assert.Assert(t, is.Contains(histogramElems, histogram.GetName()), "Mismatch histogram meter")
+		bvc[histogram.GetName()] = append(bvc[histogram.GetName()], histogram.GetValues())
+	}
+}
+
+func verifyCollection(t *testing.T, svc map[string][]float64, bvc map[string][][]*v3.MeterBucketValue) {
+	for k, v := range singleValues {
+		for i, e := range v {
+			assert.Equal(t, svc[k][i], e, fmt.Sprintf("%s collection has errors", k))
+		}
+		t.Log(fmt.Sprintf("%s  collection is OK", k))
+	}
+
+	for k, v := range bucketValues {
+		for i, e := range v {
+			for j, f := range e {
+				assert.Equal(t, bvc[k][i][j].Bucket, f.Bucket, fmt.Sprintf("%s collection has errors", k))
+				assert.Equal(t, bvc[k][i][j].Count, f.Count, fmt.Sprintf("%s collection has errors", k))
+			}
+		}
+		t.Log(fmt.Sprintf("%s  collection is OK", k))
 	}
 }
 
diff --git a/plugins/fetcher/prometheus/metric_family.go b/plugins/fetcher/prometheus/metric_family.go
index eaf0c49..da7adb1 100644
--- a/plugins/fetcher/prometheus/metric_family.go
+++ b/plugins/fetcher/prometheus/metric_family.go
@@ -118,11 +118,11 @@ func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v floa
 		if strings.HasSuffix(metricName, metricsSuffixCount) {
 			mg.hasCount = true
 			mg.count = v
-			mg.name = metricName
+			mg.name = strings.ReplaceAll(metricName, metricsSuffixCount, "")
 		} else if strings.HasSuffix(metricName, metricsSuffixSum) {
 			mg.hasSum = true
 			mg.sum = v
-			mg.name = metricName
+			mg.name = strings.ReplaceAll(metricName, metricsSuffixSum, "")
 		} else if strings.HasSuffix(metricName, metricsSuffixBucket) {
 			boundary, err := getBoundary(mf.mtype, ls)
 			if err != nil {
@@ -135,11 +135,11 @@ func (mf *metricFamily) Add(metricName string, ls labels.Labels, t int64, v floa
 		if strings.HasSuffix(metricName, metricsSuffixCount) {
 			mg.hasCount = true
 			mg.count = v
-			mg.name = metricName
+			mg.name = strings.ReplaceAll(metricName, metricsSuffixCount, "")
 		} else if strings.HasSuffix(metricName, metricsSuffixSum) {
 			mg.hasSum = true
 			mg.sum = v
-			mg.name = metricName
+			mg.name = strings.ReplaceAll(metricName, metricsSuffixSum, "")
 		} else {
 			mg.value = v
 			mg.name = metricName
@@ -171,26 +171,43 @@ func getBoundary(metricType textparse.MetricType, lbs labels.Labels) (float64, e
 	return strconv.ParseFloat(v, 64)
 }
 
-func (mf *metricFamily) toMeterSingleValue(mg *metricGroup) *v3.MeterSingleValue {
-	if mg.hasCount {
-		return &v3.MeterSingleValue{
-			Name:   mg.name,
-			Labels: mf.convertLabels(mg),
-			Value:  mg.count,
+func (mf *metricFamily) convertSummaryToSingleValue(mg *metricGroup) []*v3.MeterData {
+	result := make([]*v3.MeterData, 0)
+	if mg.hasCount || mg.hasSum {
+		if mg.hasCount {
+			msv := &v3.MeterSingleValue{
+				Name:   mg.name + metricsSuffixCount,
+				Labels: mf.convertLabels(mg),
+				Value:  mg.count,
+			}
+			result = append(result, &v3.MeterData{
+				Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
+				Timestamp: mg.ts,
+			})
 		}
-	}
-	if mg.hasSum {
-		return &v3.MeterSingleValue{
+		if mg.hasSum {
+			msv := &v3.MeterSingleValue{
+				Name:   mg.name + metricsSuffixSum,
+				Labels: mf.convertLabels(mg),
+				Value:  mg.sum,
+			}
+			result = append(result, &v3.MeterData{
+				Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
+				Timestamp: mg.ts,
+			})
+		}
+	} else {
+		msv := &v3.MeterSingleValue{
 			Name:   mg.name,
 			Labels: mf.convertLabels(mg),
-			Value:  mg.sum,
+			Value:  mg.value,
 		}
+		result = append(result, &v3.MeterData{
+			Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
+			Timestamp: mg.ts,
+		})
 	}
-	return &v3.MeterSingleValue{
-		Name:   mg.name,
-		Labels: mf.convertLabels(mg),
-		Value:  mg.value,
-	}
+	return result
 }
 
 func (mf *metricFamily) ToMetric() []*v3.MeterData {
@@ -198,17 +215,13 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
 	switch mf.mtype {
 	case textparse.MetricTypeSummary:
 		for _, mg := range mf.getGroups() {
-			msv := mf.toMeterSingleValue(mg)
-			result = append(result, &v3.MeterData{
-				Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
-				Timestamp: mg.ts,
-			})
+			result = append(result, mf.convertSummaryToSingleValue(mg)...)
 		}
 	case textparse.MetricTypeHistogram:
 		for _, mg := range mf.getGroups() {
 			if mg.hasCount {
 				msv := &v3.MeterSingleValue{
-					Name:   mg.name,
+					Name:   mg.name + metricsSuffixCount,
 					Labels: mf.convertLabels(mg),
 					Value:  mg.count,
 				}
@@ -216,11 +229,10 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
 					Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
 					Timestamp: mg.ts,
 				})
-				continue
 			}
 			if mg.hasSum {
 				msv := &v3.MeterSingleValue{
-					Name:   mg.name,
+					Name:   mg.name + metricsSuffixSum,
 					Labels: mf.convertLabels(mg),
 					Value:  mg.sum,
 				}
@@ -228,7 +240,6 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
 					Metric:    &v3.MeterData_SingleValue{SingleValue: msv},
 					Timestamp: mg.ts,
 				})
-				continue
 			}
 
 			bucketMap := make(map[float64]float64)
@@ -238,12 +249,14 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
 			sort.Slice(mg.complexValue, func(i, j int) bool {
 				return mg.complexValue[i].boundary < mg.complexValue[j].boundary
 			})
+
 			mbs := make([]*v3.MeterBucketValue, 0)
 			for index, m := range mg.complexValue {
 				if index == 0 {
 					mbv := &v3.MeterBucketValue{
-						Bucket: float64(math.MinInt64),
-						Count:  int64(m.value),
+						Bucket:             math.Inf(-1),
+						Count:              int64(m.value),
+						IsNegativeInfinity: true,
 					}
 					mbs = append(mbs, mbv)
 				} else {
@@ -286,6 +299,9 @@ func (mf *metricFamily) ToMetric() []*v3.MeterData {
 func (mf *metricFamily) convertLabels(mg *metricGroup) []*v3.Label {
 	result := make([]*v3.Label, 0)
 	for k, v := range mg.ls.Map() {
+		if !isUsefulLabel(mf.mtype, k) {
+			continue
+		}
 		label := &v3.Label{
 			Name:  k,
 			Value: v,