You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/08 05:02:56 UTC

[skywalking-satellite] branch main updated: Fix Metadata lost in the Native Meter protocol (#108)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c52a26  Fix Metadata lost in the Native Meter protocol (#108)
0c52a26 is described below

commit 0c52a26e37d778c0734ebcf9cd8a0e313c8ba682
Author: mrproliu <74...@qq.com>
AuthorDate: Fri Apr 8 13:02:50 2022 +0800

    Fix Metadata lost in the Native Meter protocol (#108)
    
    * Fix metadata missing
    
    * Fix CVE
    
    * remove log
---
 CHANGES.md                                         |  1 +
 plugins/forwarder/grpc/nativemeter/forwarder.go    | 36 +++---------------
 plugins/receiver/grpc/nativemeter/meter_service.go | 43 +++++++++++-----------
 plugins/receiver/grpc/nativemeter/receiver_test.go |  2 +-
 4 files changed, 28 insertions(+), 54 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e70f2db..5c81c26 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 * Add the compat protocol receiver for the old version of agents.
 
 #### Bug Fixes
+* Fix Metadata lost in the Native Meter protocol.
 
 #### Issues and PR
 - All issues are [here](https://github.com/apache/skywalking/milestone/115?closed=1)
diff --git a/plugins/forwarder/grpc/nativemeter/forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go
index 1fc3226..1857df1 100644
--- a/plugins/forwarder/grpc/nativemeter/forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -99,11 +99,8 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		}
 	}()
 	for _, e := range batch {
-		if data, ok := e.GetData().(*v1.SniffData_Meter); ok {
-			if err := f.handleMeter(data, streamMap); err != nil {
-				return err
-			}
-		}
+		// Only handle the meter collection data from queue
+		// There could have error when using previously meter data(SniffData_Meter)
 		if data, ok := e.GetData().(*v1.SniffData_MeterCollection); ok {
 			if err := f.handleMeterCollection(data, streamMap); err != nil {
 				return err
@@ -114,6 +111,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 }
 
 func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
+	if len(data.MeterCollection.MeterData) == 0 {
+		return nil
+	}
 	firstMeter := data.MeterCollection.MeterData[0]
 	streamName := fmt.Sprintf("batch-stream-%s-%s", firstMeter.Service, firstMeter.ServiceInstance)
 	stream := streamMap[streamName]
@@ -140,32 +140,6 @@ func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, st
 	return nil
 }
 
-func (f *Forwarder) handleMeter(data *v1.SniffData_Meter, streamMap map[string]grpc.ClientStream) error {
-	streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
-	stream := streamMap[streamName]
-	if stream == nil {
-		ctx := lb.WithLoadBalanceConfig(
-			context.Background(),
-			data.Meter.ServiceInstance,
-			f.loadCachedPeer(data.Meter.ServiceInstance))
-
-		curStream, err := f.meterClient.Collect(ctx)
-		if err != nil {
-			log.Logger.Errorf("open grpc stream error %v", err)
-			return err
-		}
-		streamMap[streamName] = curStream
-		stream = curStream
-		f.savePeerInstanceFromStream(curStream, data.Meter.ServiceInstance)
-	}
-
-	if err := stream.SendMsg(data.Meter); err != nil {
-		log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
-		return err
-	}
-	return nil
-}
-
 func (f *Forwarder) savePeerInstanceFromStream(stream grpc.ClientStream, instance string) {
 	upstream := server_grpc.GetPeerAddressFromStreamContext(stream.Context())
 	if upstream == "" {
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go
index fd3c2f5..8d70c6d 100644
--- a/plugins/receiver/grpc/nativemeter/meter_service.go
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -34,37 +34,36 @@ type MeterService struct {
 }
 
 func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) error {
-	var service, instance string
+	dataList := make([]*meter.MeterData, 0)
 	for {
 		item, err := stream.Recv()
 		if err == io.EOF {
+			m.flushMeter(dataList)
 			return stream.SendAndClose(&common.Commands{})
 		}
 		if err != nil {
+			m.flushMeter(dataList)
 			return err
 		}
-		// only first item has service and service instance property
-		// need correlate information to each item
-		if item.Service != "" {
-			service = item.Service
-		}
-		if item.ServiceInstance != "" {
-			instance = item.ServiceInstance
-		}
-		item.Service = service
-		item.ServiceInstance = instance
-		d := &v1.SniffData{
-			Name:      eventName,
-			Timestamp: time.Now().UnixNano() / 1e6,
-			Meta:      nil,
-			Type:      v1.SniffType_MeterType,
-			Remote:    true,
-			Data: &v1.SniffData_Meter{
-				Meter: item,
-			},
-		}
-		m.receiveChannel <- d
+		dataList = append(dataList, item)
+	}
+}
+
+func (m *MeterService) flushMeter(dataList []*meter.MeterData) {
+	if len(dataList) == 0 {
+		return
+	}
+	d := &v1.SniffData{
+		Name:      eventName,
+		Timestamp: time.Now().UnixNano() / 1e6,
+		Meta:      nil,
+		Type:      v1.SniffType_MeterType,
+		Remote:    true,
+		Data: &v1.SniffData_MeterCollection{
+			MeterCollection: &meter.MeterDataCollection{MeterData: dataList},
+		},
 	}
+	m.receiveChannel <- d
 }
 
 func (m *MeterService) CollectBatch(batch meter.MeterReportService_CollectBatchServer) error {
diff --git a/plugins/receiver/grpc/nativemeter/receiver_test.go b/plugins/receiver/grpc/nativemeter/receiver_test.go
index ea846ba..f836a34 100644
--- a/plugins/receiver/grpc/nativemeter/receiver_test.go
+++ b/plugins/receiver/grpc/nativemeter/receiver_test.go
@@ -48,7 +48,7 @@ func TestReceiver_RegisterHandler(t *testing.T) {
 		}
 		return data.String()
 	}, func(data *v1.SniffData) string {
-		return data.GetMeter().String()
+		return data.GetMeterCollection().MeterData[0].String()
 	}, t)
 }