You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/26 18:19:24 UTC

[beam] branch master updated: [Go SDK] Add tests to the metrics package (#23769)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fb29f02efb [Go SDK] Add tests to the metrics package (#23769)
8fb29f02efb is described below

commit 8fb29f02efb3f8107255beadf88b5a98551ca62b
Author: andremissaglia <an...@gmail.com>
AuthorDate: Wed Oct 26 15:18:39 2022 -0300

    [Go SDK] Add tests to the metrics package (#23769)
---
 sdks/go/pkg/beam/core/metrics/dumper_test.go  | 49 ++++++++++++++
 sdks/go/pkg/beam/core/metrics/metrics_test.go | 94 +++++++++++++++++++++++++++
 sdks/go/pkg/beam/core/metrics/store_test.go   | 62 ++++++++++++++++++
 3 files changed, 205 insertions(+)

diff --git a/sdks/go/pkg/beam/core/metrics/dumper_test.go b/sdks/go/pkg/beam/core/metrics/dumper_test.go
new file mode 100644
index 00000000000..e618354eda8
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/dumper_test.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package metrics
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+)
+
+func TestDumperExtractor(t *testing.T) {
+	var got []string
+	printer := func(format string, args ...interface{}) {
+		got = append(got, fmt.Sprintf(format, args...))
+	}
+
+	store := newStore()
+	now := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)
+	store.storeMetric("pid", newName("ns", "counter"), &counter{value: 1})
+	store.storeMetric("pid", newName("ns", "distribution"), &distribution{count: 1, sum: 2, min: 3, max: 4})
+	store.storeMetric("pid", newName("ns", "gauge"), &gauge{v: 1, t: now})
+
+	expected := []string{
+		"PTransformID: \"pid\"",
+		"	ns.counter - value: 1",
+		"	ns.distribution - count: 1 sum: 2 min: 3 max: 4",
+		"	ns.gauge - Gauge time: 2019-01-01 00:00:00 +0000 UTC value: 1",
+	}
+
+	dumperExtractor(store, printer)
+	if diff := cmp.Diff(expected, got); diff != "" {
+		t.Errorf("dumperExtractor() got diff (-want +got): %v", diff)
+	}
+}
diff --git a/sdks/go/pkg/beam/core/metrics/metrics_test.go b/sdks/go/pkg/beam/core/metrics/metrics_test.go
index ff3141d748a..75b483184ab 100644
--- a/sdks/go/pkg/beam/core/metrics/metrics_test.go
+++ b/sdks/go/pkg/beam/core/metrics/metrics_test.go
@@ -400,6 +400,53 @@ func TestMergeDistributions(t *testing.T) {
 	}
 }
 
+func TestMergePCols(t *testing.T) {
+	realKey := StepKey{Name: "real"}
+	pColA := PColValue{ElementCount: 1, SampledByteSize: DistributionValue{Count: 2, Sum: 3, Min: 4, Max: 5}}
+	pColB := PColValue{ElementCount: 5, SampledByteSize: DistributionValue{Count: 4, Sum: 3, Min: 2, Max: 1}}
+	tests := []struct {
+		name                 string
+		attempted, committed map[StepKey]PColValue
+		want                 []PColResult
+	}{
+		{
+			name: "merge",
+			attempted: map[StepKey]PColValue{
+				realKey: pColA,
+			},
+			committed: map[StepKey]PColValue{
+				realKey: pColB,
+			},
+			want: []PColResult{{Attempted: pColA, Committed: pColB, Key: realKey}},
+		}, {
+			name: "attempted only",
+			attempted: map[StepKey]PColValue{
+				realKey: pColA,
+			},
+			committed: map[StepKey]PColValue{},
+			want:      []PColResult{{Attempted: pColA, Key: realKey}},
+		}, {
+			name:      "committed only",
+			attempted: map[StepKey]PColValue{},
+			committed: map[StepKey]PColValue{
+				realKey: pColB,
+			},
+			want: []PColResult{{Committed: pColB, Key: realKey}},
+		},
+	}
+	less := func(a, b DistributionResult) bool {
+		return a.Key.Name < b.Key.Name
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			got := MergePCols(test.attempted, test.committed)
+			if d := cmp.Diff(test.want, got, cmpopts.SortSlices(less)); d != "" {
+				t.Errorf("MergePCols(%+v, %+v) = %+v, want %+v\ndiff:\n%v", test.attempted, test.committed, got, test.want, d)
+			}
+		})
+	}
+}
+
 func TestMergeGauges(t *testing.T) {
 	realKey := StepKey{Name: "real"}
 	now := time.Now()
@@ -449,6 +496,53 @@ func TestMergeGauges(t *testing.T) {
 	}
 }
 
+func TestMergeMsecs(t *testing.T) {
+	realKey := StepKey{Name: "real"}
+	msecA := MsecValue{Start: time.Second, Process: 2 * time.Second, Finish: time.Second, Total: 4 * time.Second}
+	msecB := MsecValue{Start: 2 * time.Second, Process: time.Second, Finish: 2 * time.Second, Total: 5 * time.Second}
+	tests := []struct {
+		name                 string
+		attempted, committed map[StepKey]MsecValue
+		want                 []MsecResult
+	}{
+		{
+			name: "merge",
+			attempted: map[StepKey]MsecValue{
+				realKey: msecA,
+			},
+			committed: map[StepKey]MsecValue{
+				realKey: msecB,
+			},
+			want: []MsecResult{{Attempted: msecA, Committed: msecB, Key: realKey}},
+		}, {
+			name: "attempted only",
+			attempted: map[StepKey]MsecValue{
+				realKey: msecA,
+			},
+			committed: map[StepKey]MsecValue{},
+			want:      []MsecResult{{Attempted: msecA, Key: realKey}},
+		}, {
+			name:      "committed only",
+			attempted: map[StepKey]MsecValue{},
+			committed: map[StepKey]MsecValue{
+				realKey: msecB,
+			},
+			want: []MsecResult{{Committed: msecB, Key: realKey}},
+		},
+	}
+	less := func(a, b DistributionResult) bool {
+		return a.Key.Name < b.Key.Name
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			got := MergeMsecs(test.attempted, test.committed)
+			if d := cmp.Diff(test.want, got, cmpopts.SortSlices(less)); d != "" {
+				t.Errorf("MergeMsecs(%+v, %+v) = %+v, want %+v\ndiff:\n%v", test.attempted, test.committed, got, test.want, d)
+			}
+		})
+	}
+}
+
 func TestMsecQueryResult(t *testing.T) {
 	realKey := StepKey{Step: "sumFn"}
 	msecA := MsecValue{Start: 0, Process: 0, Finish: 0, Total: 0}
diff --git a/sdks/go/pkg/beam/core/metrics/store_test.go b/sdks/go/pkg/beam/core/metrics/store_test.go
new file mode 100644
index 00000000000..dcdadad7429
--- /dev/null
+++ b/sdks/go/pkg/beam/core/metrics/store_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//	http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package metrics
+
+import (
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestStore(t *testing.T) {
+	store := newStore()
+
+	m := make(map[Labels]interface{})
+	e := &Extractor{
+		SumInt64: func(l Labels, v int64) {
+			m[l] = &counter{value: v}
+		},
+		DistributionInt64: func(l Labels, count, sum, min, max int64) {
+			m[l] = &distribution{count: count, sum: sum, min: min, max: max}
+		},
+		GaugeInt64: func(l Labels, v int64, t time.Time) {
+			m[l] = &gauge{v: v, t: t}
+		},
+		MsecsInt64: func(labels string, e *[4]ExecutionState) {},
+	}
+
+	now := time.Now()
+
+	store.storeMetric("pid", newName("ns", "counter"), &counter{value: 1})
+	store.storeMetric("pid", newName("ns", "distribution"), &distribution{count: 1, sum: 2, min: 3, max: 4})
+	store.storeMetric("pid", newName("ns", "gauge"), &gauge{v: 1, t: now})
+
+	// storing the same metric twice doesn't change anything
+	store.storeMetric("pid", newName("ns", "counter"), &counter{value: 2})
+
+	err := e.ExtractFrom(store)
+	if err != nil {
+		t.Fatalf("e.ExtractFrom(store) = %q, want nil", err)
+	}
+
+	expected := map[Labels]interface{}{
+		{transform: "pid", namespace: "ns", name: "counter"}:      &counter{value: 1},
+		{transform: "pid", namespace: "ns", name: "distribution"}: &distribution{count: 1, sum: 2, min: 3, max: 4},
+		{transform: "pid", namespace: "ns", name: "gauge"}:        &gauge{v: 1, t: now},
+	}
+	if !reflect.DeepEqual(m, expected) {
+		t.Errorf("e.ExtractFrom(store) = %v, want %v", m, expected)
+	}
+}