You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/07 22:42:06 UTC
[pulsar] branch branch-3.0 updated: [fix][fn] check user metric len before iterating (#20021)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 9a271ae4454 [fix][fn] check user metric len before iterating (#20021)
9a271ae4454 is described below
commit 9a271ae445471d8ba516abe53747eca0ae617b69
Author: Andy Walker <wa...@gmail.com>
AuthorDate: Wed Apr 12 20:28:42 2023 -0400
[fix][fn] check user metric len before iterating (#20021)
Co-authored-by: Andy Walker <an...@andy.dev>
(cherry picked from commit 52e8144587548a692e550a8538f4d2667b5499d6)
---
pulsar-function-go/pf/instance.go | 3 ++
.../pf/instanceControlServicer_test.go | 53 ++++++++++++++++++++++
pulsar-function-go/pf/stats_test.go | 27 +++++++++++
3 files changed, 83 insertions(+)
diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 5d17cfe0c33..a82273031ec 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -669,6 +669,9 @@ func (gi *goInstance) getTotalReceived1min() float32 {
func (gi *goInstance) getUserMetricsMap() map[string]float64 {
userMetricMap := map[string]float64{}
filteredMetricFamilies := gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+ if len(filteredMetricFamilies) == 0 {
+ return userMetricMap
+ }
for _, m := range filteredMetricFamilies[0].GetMetric() {
var isFuncMetric bool
var userLabelName string
diff --git a/pulsar-function-go/pf/instanceControlServicer_test.go b/pulsar-function-go/pf/instanceControlServicer_test.go
index 836ec6e5c79..9344d0a5915 100644
--- a/pulsar-function-go/pf/instanceControlServicer_test.go
+++ b/pulsar-function-go/pf/instanceControlServicer_test.go
@@ -21,6 +21,7 @@ package pf
import (
"context"
+ "fmt"
"log"
"net"
"testing"
@@ -76,3 +77,55 @@ func TestInstanceControlServicer_serve_creates_valid_instance(t *testing.T) {
log.Printf("Response: %+v", resp.Success)
assert.Equal(t, resp.Success, true)
}
+
+func instanceCommunicationClient(t *testing.T, instance *goInstance) pb.InstanceControlClient {
+ t.Helper()
+
+ if instance == nil {
+ t.Fatalf("cannot create communication client for nil instance")
+ }
+
+ var (
+ ctx context.Context = context.Background()
+ cf context.CancelFunc
+ )
+
+ if testDeadline, ok := t.Deadline(); ok {
+ ctx, cf = context.WithDeadline(context.Background(), testDeadline)
+ t.Cleanup(cf)
+ }
+
+ lis = bufconn.Listen(bufSize)
+ t.Cleanup(func() {
+ lis.Close()
+ })
+ // create a gRPC server object
+ grpcServer := grpc.NewServer()
+ t.Cleanup(func() {
+ grpcServer.Stop()
+ })
+
+ servicer := InstanceControlServicer{instance}
+ // must register before we start the service.
+ pb.RegisterInstanceControlServer(grpcServer, &servicer)
+
+ // start the server
+ t.Logf("Serving InstanceCommunication on port %d", instance.context.GetPort())
+
+ go func() {
+ if err := grpcServer.Serve(lis); err != nil {
+ panic(fmt.Sprintf("grpc server exited with error: %v", err))
+ }
+ }()
+
+ // Now we can setup the client:
+ conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(getBufDialer(lis)), grpc.WithInsecure())
+ if err != nil {
+ t.Fatalf("Failed to dial bufnet: %v", err)
+ }
+ t.Cleanup(func() {
+ conn.Close()
+ })
+ client := pb.NewInstanceControlClient(conn)
+ return client
+}
diff --git a/pulsar-function-go/pf/stats_test.go b/pulsar-function-go/pf/stats_test.go
index d52b08b7173..7b415ef5eff 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -20,6 +20,7 @@
package pf
import (
+ "context"
"fmt"
"io/ioutil"
"math"
@@ -28,6 +29,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
@@ -257,3 +259,28 @@ func TestUserMetrics(t *testing.T) {
gi.close()
metricsServicer.close()
}
+
+func TestInstanceControlMetrics(t *testing.T) {
+ instance := newGoInstance()
+ t.Cleanup(instance.close)
+ instanceClient := instanceCommunicationClient(t, instance)
+ _, err := instanceClient.GetMetrics(context.Background(), &empty.Empty{})
+ assert.NoError(t, err, "err communicating with instance control: %v", err)
+
+ testLabels := []string{"userMetricControlTest1", "userMetricControlTest2"}
+ for _, label := range testLabels {
+ assert.NotContainsf(t, label, "user metrics should not yet contain %s", label)
+ }
+
+ for value, label := range testLabels {
+ instance.context.RecordMetric(label, float64(value+1))
+ }
+ time.Sleep(time.Second)
+
+ metrics, err := instanceClient.GetMetrics(context.Background(), &empty.Empty{})
+ assert.NoError(t, err, "err communicating with instance control: %v", err)
+ for value, label := range testLabels {
+ assert.Containsf(t, metrics.UserMetrics, label, "user metrics should contain metric %s", label)
+ assert.EqualValuesf(t, value+1, metrics.UserMetrics[label], "user metric %s != %d", label, value+1)
+ }
+}