You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/03/03 01:55:01 UTC

[beam] branch master updated: [BEAM-13903] Improve coverage of metricsx package (#16994)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e234543  [BEAM-13903] Improve coverage of metricsx package (#16994)
e234543 is described below

commit e2345430de5910c63b009b851aa2b3d6a29789ae
Author: Danny McCormick <da...@google.com>
AuthorDate: Wed Mar 2 20:52:33 2022 -0500

    [BEAM-13903] Improve coverage of metricsx package (#16994)
---
 .../beam/core/runtime/metricsx/metricsx_test.go    | 184 ++++++++++++++++++++-
 1 file changed, 181 insertions(+), 3 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
index 20492ab..95b92f9 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -59,7 +59,7 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
 
 	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Counters()
 	size := len(got)
-	if size < 1 {
+	if size != 1 {
 		t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
 	}
 	if d := cmp.Diff(want, got[0]); d != "" {
@@ -68,6 +68,184 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
 	}
 }
 
+func TestFromMonitoringInfos_Msec(t *testing.T) {
+	want := metrics.MsecResult{
+		Attempted: metrics.MsecValue{
+			Start:   15 * time.Millisecond,
+			Process: 20 * time.Millisecond,
+			Finish:  40 * time.Millisecond,
+			Total:   25 * time.Millisecond,
+		},
+		Committed: metrics.MsecValue{
+			Start:   0 * time.Millisecond,
+			Process: 0 * time.Millisecond,
+			Finish:  0 * time.Millisecond,
+			Total:   0 * time.Millisecond,
+		},
+		Key: metrics.StepKey{
+			Step:      "main.customDoFn",
+			Name:      "customCounter",
+			Namespace: "customDoFn",
+		},
+	}
+
+	labels := map[string]string{
+		"PTRANSFORM": "main.customDoFn",
+		"NAMESPACE":  "customDoFn",
+		"NAME":       "customCounter",
+	}
+
+	startValue, err := Int64Counter(int64(15))
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Counter: %v", err)
+	}
+	processValue, err := Int64Counter(int64(20))
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Counter: %v", err)
+	}
+	finishValue, err := Int64Counter(int64(40))
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Counter: %v", err)
+	}
+	totalValue, err := Int64Counter(int64(25))
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Counter: %v", err)
+	}
+	mStartBundleInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(ExecutionMsecUrn(0)),
+		Type:    UrnToType(ExecutionMsecUrn(0)),
+		Labels:  labels,
+		Payload: startValue,
+	}
+	mProcessBundleInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(ExecutionMsecUrn(1)),
+		Type:    UrnToType(ExecutionMsecUrn(1)),
+		Labels:  labels,
+		Payload: processValue,
+	}
+	mFinishBundleInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(ExecutionMsecUrn(2)),
+		Type:    UrnToType(ExecutionMsecUrn(2)),
+		Labels:  labels,
+		Payload: finishValue,
+	}
+	mTotalTimeInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(ExecutionMsecUrn(3)),
+		Type:    UrnToType(ExecutionMsecUrn(3)),
+		Labels:  labels,
+		Payload: totalValue,
+	}
+
+	attempted := []*pipepb.MonitoringInfo{mStartBundleInfo, mProcessBundleInfo, mFinishBundleInfo, mTotalTimeInfo}
+	committed := []*pipepb.MonitoringInfo{}
+	p := &pipepb.Pipeline{}
+
+	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Msecs()
+	size := len(got)
+	if size != 1 {
+		t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+	}
+	if d := cmp.Diff(want, got[0]); d != "" {
+		t.Fatalf("Invalid MsecResult: got: %v, want: %v, diff(-want,+got):\n %v",
+			got[0], want, d)
+	}
+}
+
+func TestFromMonitoringInfos_PColCounters(t *testing.T) {
+	var value int64 = 15
+	want := metrics.PColResult{
+		Attempted: metrics.PColValue{
+			ElementCount: 15,
+		},
+		Key: metrics.StepKey{
+			Step:      "main.customDoFn",
+			Name:      "customCounter",
+			Namespace: "customDoFn",
+		}}
+
+	payload, err := Int64Counter(value)
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Counter: %v", err)
+	}
+
+	labels := map[string]string{
+		"PTRANSFORM": "main.customDoFn",
+		"NAMESPACE":  "customDoFn",
+		"NAME":       "customCounter",
+	}
+
+	mInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(UrnElementCount),
+		Type:    UrnToType(UrnElementCount),
+		Labels:  labels,
+		Payload: payload,
+	}
+
+	attempted := []*pipepb.MonitoringInfo{mInfo}
+	committed := []*pipepb.MonitoringInfo{}
+	p := &pipepb.Pipeline{}
+
+	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
+	size := len(got)
+	if size != 1 {
+		t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+	}
+	if d := cmp.Diff(want, got[0]); d != "" {
+		t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
+			got[0], want, d)
+	}
+}
+
+func TestFromMonitoringInfos_SampledByteSize(t *testing.T) {
+	want := metrics.PColResult{
+		Attempted: metrics.PColValue{
+			SampledByteSize: metrics.DistributionValue{
+				Count: 100,
+				Sum:   5,
+				Min:   -12,
+				Max:   30,
+			},
+		},
+		Key: metrics.StepKey{
+			Step:      "main.customDoFn",
+			Name:      "customCounter",
+			Namespace: "customDoFn",
+		}}
+
+	var count, sum, min, max int64 = 100, 5, -12, 30
+	payload, err := Int64Distribution(count, sum, min, max)
+	if err != nil {
+		t.Fatalf("Failed to encode Int64Distribution: %v", err)
+	}
+
+	labels := map[string]string{
+		"PTRANSFORM": "main.customDoFn",
+		"NAMESPACE":  "customDoFn",
+		"NAME":       "customCounter",
+	}
+
+	mInfo := &pipepb.MonitoringInfo{
+		Urn:     UrnToString(UrnSampledByteSize),
+		Type:    UrnToType(UrnSampledByteSize),
+		Labels:  labels,
+		Payload: payload,
+	}
+
+	attempted := []*pipepb.MonitoringInfo{mInfo}
+	committed := []*pipepb.MonitoringInfo{}
+	p := &pipepb.Pipeline{}
+
+	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
+	size := len(got)
+	if size != 1 {
+		t.Fatalf("Invalid array's size: got: %v, want: %v", size, FromMonitoringInfos(p, attempted, committed).AllMetrics())
+	}
+	if d := cmp.Diff(want, got[0]); d != "" {
+		t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
+			got[0], want, d)
+	}
+}
+
 func TestFromMonitoringInfos_Distributions(t *testing.T) {
 	var count, sum, min, max int64 = 100, 5, -12, 30
 
@@ -109,7 +287,7 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) {
 
 	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Distributions()
 	size := len(got)
-	if size < 1 {
+	if size != 1 {
 		t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
 	}
 	if d := cmp.Diff(want, got[0]); d != "" {
@@ -159,7 +337,7 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) {
 
 	got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Gauges()
 	size := len(got)
-	if size < 1 {
+	if size != 1 {
 		t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
 	}
 	if d := cmp.Diff(want, got[0]); d != "" {