You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/07 22:42:06 UTC

[pulsar] branch branch-3.0 updated: [fix][fn] check user metric len before iterating (#20021)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9a271ae4454 [fix][fn] check user metric len before iterating (#20021)
9a271ae4454 is described below

commit 9a271ae445471d8ba516abe53747eca0ae617b69
Author: Andy Walker <wa...@gmail.com>
AuthorDate: Wed Apr 12 20:28:42 2023 -0400

    [fix][fn] check user metric len before iterating (#20021)
    
    Co-authored-by: Andy Walker <an...@andy.dev>
    (cherry picked from commit 52e8144587548a692e550a8538f4d2667b5499d6)
---
 pulsar-function-go/pf/instance.go                  |  3 ++
 .../pf/instanceControlServicer_test.go             | 53 ++++++++++++++++++++++
 pulsar-function-go/pf/stats_test.go                | 27 +++++++++++
 3 files changed, 83 insertions(+)

diff --git a/pulsar-function-go/pf/instance.go b/pulsar-function-go/pf/instance.go
index 5d17cfe0c33..a82273031ec 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -669,6 +669,9 @@ func (gi *goInstance) getTotalReceived1min() float32 {
 func (gi *goInstance) getUserMetricsMap() map[string]float64 {
 	userMetricMap := map[string]float64{}
 	filteredMetricFamilies := gi.getFilteredMetricFamilies(PulsarFunctionMetricsPrefix + UserMetric)
+	if len(filteredMetricFamilies) == 0 {
+		return userMetricMap
+	}
 	for _, m := range filteredMetricFamilies[0].GetMetric() {
 		var isFuncMetric bool
 		var userLabelName string
diff --git a/pulsar-function-go/pf/instanceControlServicer_test.go b/pulsar-function-go/pf/instanceControlServicer_test.go
index 836ec6e5c79..9344d0a5915 100644
--- a/pulsar-function-go/pf/instanceControlServicer_test.go
+++ b/pulsar-function-go/pf/instanceControlServicer_test.go
@@ -21,6 +21,7 @@ package pf
 
 import (
 	"context"
+	"fmt"
 	"log"
 	"net"
 	"testing"
@@ -76,3 +77,55 @@ func TestInstanceControlServicer_serve_creates_valid_instance(t *testing.T) {
 	log.Printf("Response: %+v", resp.Success)
 	assert.Equal(t, resp.Success, true)
 }
+
+func instanceCommunicationClient(t *testing.T, instance *goInstance) pb.InstanceControlClient {
+	t.Helper()
+
+	if instance == nil {
+		t.Fatalf("cannot create communication client for nil instance")
+	}
+
+	var (
+		ctx context.Context = context.Background()
+		cf  context.CancelFunc
+	)
+
+	if testDeadline, ok := t.Deadline(); ok {
+		ctx, cf = context.WithDeadline(context.Background(), testDeadline)
+		t.Cleanup(cf)
+	}
+
+	lis = bufconn.Listen(bufSize)
+	t.Cleanup(func() {
+		lis.Close()
+	})
+	// create a gRPC server object
+	grpcServer := grpc.NewServer()
+	t.Cleanup(func() {
+		grpcServer.Stop()
+	})
+
+	servicer := InstanceControlServicer{instance}
+	// must register before we start the service.
+	pb.RegisterInstanceControlServer(grpcServer, &servicer)
+
+	// start the server
+	t.Logf("Serving InstanceCommunication on port %d", instance.context.GetPort())
+
+	go func() {
+		if err := grpcServer.Serve(lis); err != nil {
+			panic(fmt.Sprintf("grpc server exited with error: %v", err))
+		}
+	}()
+
+	// Now we can setup the client:
+	conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(getBufDialer(lis)), grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("Failed to dial bufnet: %v", err)
+	}
+	t.Cleanup(func() {
+		conn.Close()
+	})
+	client := pb.NewInstanceControlClient(conn)
+	return client
+}
diff --git a/pulsar-function-go/pf/stats_test.go b/pulsar-function-go/pf/stats_test.go
index d52b08b7173..7b415ef5eff 100644
--- a/pulsar-function-go/pf/stats_test.go
+++ b/pulsar-function-go/pf/stats_test.go
@@ -20,6 +20,7 @@
 package pf
 
 import (
+	"context"
 	"fmt"
 	"io/ioutil"
 	"math"
@@ -28,6 +29,7 @@ import (
 	"time"
 
 	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/stretchr/testify/assert"
 
@@ -257,3 +259,28 @@ func TestUserMetrics(t *testing.T) {
 	gi.close()
 	metricsServicer.close()
 }
+
+func TestInstanceControlMetrics(t *testing.T) {
+	instance := newGoInstance()
+	t.Cleanup(instance.close)
+	instanceClient := instanceCommunicationClient(t, instance)
+	_, err := instanceClient.GetMetrics(context.Background(), &empty.Empty{})
+	assert.NoError(t, err, "err communicating with instance control: %v", err)
+
+	testLabels := []string{"userMetricControlTest1", "userMetricControlTest2"}
+	for _, label := range testLabels {
+		assert.NotContainsf(t, label, "user metrics should not yet contain %s", label)
+	}
+
+	for value, label := range testLabels {
+		instance.context.RecordMetric(label, float64(value+1))
+	}
+	time.Sleep(time.Second)
+
+	metrics, err := instanceClient.GetMetrics(context.Background(), &empty.Empty{})
+	assert.NoError(t, err, "err communicating with instance control: %v", err)
+	for value, label := range testLabels {
+		assert.Containsf(t, metrics.UserMetrics, label, "user metrics should contain metric %s", label)
+		assert.EqualValuesf(t, value+1, metrics.UserMetrics[label], "user metric %s != %d", label, value+1)
+	}
+}