You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/03/03 01:55:01 UTC
[beam] branch master updated: [BEAM-13903] Improve coverage of metricsx package (#16994)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e234543 [BEAM-13903] Improve coverage of metricsx package (#16994)
e234543 is described below
commit e2345430de5910c63b009b851aa2b3d6a29789ae
Author: Danny McCormick <da...@google.com>
AuthorDate: Wed Mar 2 20:52:33 2022 -0500
[BEAM-13903] Improve coverage of metricsx package (#16994)
---
.../beam/core/runtime/metricsx/metricsx_test.go | 184 ++++++++++++++++++++-
1 file changed, 181 insertions(+), 3 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
index 20492ab..95b92f9 100644
--- a/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
+++ b/sdks/go/pkg/beam/core/runtime/metricsx/metricsx_test.go
@@ -59,7 +59,7 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Counters()
size := len(got)
- if size < 1 {
+ if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
@@ -68,6 +68,184 @@ func TestFromMonitoringInfos_Counters(t *testing.T) {
}
}
+func TestFromMonitoringInfos_Msec(t *testing.T) {
+ want := metrics.MsecResult{
+ Attempted: metrics.MsecValue{
+ Start: 15 * time.Millisecond,
+ Process: 20 * time.Millisecond,
+ Finish: 40 * time.Millisecond,
+ Total: 25 * time.Millisecond,
+ },
+ Committed: metrics.MsecValue{
+ Start: 0 * time.Millisecond,
+ Process: 0 * time.Millisecond,
+ Finish: 0 * time.Millisecond,
+ Total: 0 * time.Millisecond,
+ },
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customCounter",
+ Namespace: "customDoFn",
+ },
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customCounter",
+ }
+
+ startValue, err := Int64Counter(int64(15))
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+ processValue, err := Int64Counter(int64(20))
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+ finishValue, err := Int64Counter(int64(40))
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+ totalValue, err := Int64Counter(int64(25))
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+ mStartBundleInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(ExecutionMsecUrn(0)),
+ Type: UrnToType(ExecutionMsecUrn(0)),
+ Labels: labels,
+ Payload: startValue,
+ }
+ mProcessBundleInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(ExecutionMsecUrn(1)),
+ Type: UrnToType(ExecutionMsecUrn(1)),
+ Labels: labels,
+ Payload: processValue,
+ }
+ mFinishBundleInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(ExecutionMsecUrn(2)),
+ Type: UrnToType(ExecutionMsecUrn(2)),
+ Labels: labels,
+ Payload: finishValue,
+ }
+ mTotalTimeInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(ExecutionMsecUrn(3)),
+ Type: UrnToType(ExecutionMsecUrn(3)),
+ Labels: labels,
+ Payload: totalValue,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mStartBundleInfo, mProcessBundleInfo, mFinishBundleInfo, mTotalTimeInfo}
+ committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
+
+ got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Msecs()
+ size := len(got)
+ if size != 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid MsecResult: got: %v, want: %v, diff(-want,+got):\n %v",
+ got[0], want, d)
+ }
+}
+
+func TestFromMonitoringInfos_PColCounters(t *testing.T) {
+ var value int64 = 15
+ want := metrics.PColResult{
+ Attempted: metrics.PColValue{
+ ElementCount: 15,
+ },
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customCounter",
+ Namespace: "customDoFn",
+ }}
+
+ payload, err := Int64Counter(value)
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Counter: %v", err)
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customCounter",
+ }
+
+ mInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(UrnElementCount),
+ Type: UrnToType(UrnElementCount),
+ Labels: labels,
+ Payload: payload,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mInfo}
+ committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
+
+ got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
+ size := len(got)
+ if size != 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
+ got[0], want, d)
+ }
+}
+
+func TestFromMonitoringInfos_SampledByteSize(t *testing.T) {
+ want := metrics.PColResult{
+ Attempted: metrics.PColValue{
+ SampledByteSize: metrics.DistributionValue{
+ Count: 100,
+ Sum: 5,
+ Min: -12,
+ Max: 30,
+ },
+ },
+ Key: metrics.StepKey{
+ Step: "main.customDoFn",
+ Name: "customCounter",
+ Namespace: "customDoFn",
+ }}
+
+ var count, sum, min, max int64 = 100, 5, -12, 30
+ payload, err := Int64Distribution(count, sum, min, max)
+ if err != nil {
+ t.Fatalf("Failed to encode Int64Distribution: %v", err)
+ }
+
+ labels := map[string]string{
+ "PTRANSFORM": "main.customDoFn",
+ "NAMESPACE": "customDoFn",
+ "NAME": "customCounter",
+ }
+
+ mInfo := &pipepb.MonitoringInfo{
+ Urn: UrnToString(UrnSampledByteSize),
+ Type: UrnToType(UrnSampledByteSize),
+ Labels: labels,
+ Payload: payload,
+ }
+
+ attempted := []*pipepb.MonitoringInfo{mInfo}
+ committed := []*pipepb.MonitoringInfo{}
+ p := &pipepb.Pipeline{}
+
+ got := FromMonitoringInfos(p, attempted, committed).AllMetrics().PCols()
+ size := len(got)
+ if size != 1 {
+ t.Fatalf("Invalid array's size: got: %v, want: %v", size, FromMonitoringInfos(p, attempted, committed).AllMetrics())
+ }
+ if d := cmp.Diff(want, got[0]); d != "" {
+ t.Fatalf("Invalid counter: got: %v, want: %v, diff(-want,+got):\n %v",
+ got[0], want, d)
+ }
+}
+
func TestFromMonitoringInfos_Distributions(t *testing.T) {
var count, sum, min, max int64 = 100, 5, -12, 30
@@ -109,7 +287,7 @@ func TestFromMonitoringInfos_Distributions(t *testing.T) {
got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Distributions()
size := len(got)
- if size < 1 {
+ if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {
@@ -159,7 +337,7 @@ func TestFromMonitoringInfos_Gauges(t *testing.T) {
got := FromMonitoringInfos(p, attempted, committed).AllMetrics().Gauges()
size := len(got)
- if size < 1 {
+ if size != 1 {
t.Fatalf("Invalid array's size: got: %v, want: %v", size, 1)
}
if d := cmp.Diff(want, got[0]); d != "" {