You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2022/04/08 05:02:56 UTC
[skywalking-satellite] branch main updated: Fix Metadata lost in the Native Meter protocol (#108)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 0c52a26 Fix Metadata lost in the Native Meter protocol (#108)
0c52a26 is described below
commit 0c52a26e37d778c0734ebcf9cd8a0e313c8ba682
Author: mrproliu <74...@qq.com>
AuthorDate: Fri Apr 8 13:02:50 2022 +0800
Fix Metadata lost in the Native Meter protocol (#108)
* Fix metadata missing
* Fix CVE
* remove log
---
CHANGES.md | 1 +
plugins/forwarder/grpc/nativemeter/forwarder.go | 36 +++---------------
plugins/receiver/grpc/nativemeter/meter_service.go | 43 +++++++++++-----------
plugins/receiver/grpc/nativemeter/receiver_test.go | 2 +-
4 files changed, 28 insertions(+), 54 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e70f2db..5c81c26 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
* Add the compat protocol receiver for the old version of agents.
#### Bug Fixes
+* Fix Metadata lost in the Native Meter protocol.
#### Issues and PR
- All issues are [here](https://github.com/apache/skywalking/milestone/115?closed=1)
diff --git a/plugins/forwarder/grpc/nativemeter/forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go
index 1fc3226..1857df1 100644
--- a/plugins/forwarder/grpc/nativemeter/forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -99,11 +99,8 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
}
}()
for _, e := range batch {
- if data, ok := e.GetData().(*v1.SniffData_Meter); ok {
- if err := f.handleMeter(data, streamMap); err != nil {
- return err
- }
- }
+ // Only handle the meter collection data from queue
+ // There could have error when using previously meter data(SniffData_Meter)
if data, ok := e.GetData().(*v1.SniffData_MeterCollection); ok {
if err := f.handleMeterCollection(data, streamMap); err != nil {
return err
@@ -114,6 +111,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
}
func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
+ if len(data.MeterCollection.MeterData) == 0 {
+ return nil
+ }
firstMeter := data.MeterCollection.MeterData[0]
streamName := fmt.Sprintf("batch-stream-%s-%s", firstMeter.Service, firstMeter.ServiceInstance)
stream := streamMap[streamName]
@@ -140,32 +140,6 @@ func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, st
return nil
}
-func (f *Forwarder) handleMeter(data *v1.SniffData_Meter, streamMap map[string]grpc.ClientStream) error {
- streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
- stream := streamMap[streamName]
- if stream == nil {
- ctx := lb.WithLoadBalanceConfig(
- context.Background(),
- data.Meter.ServiceInstance,
- f.loadCachedPeer(data.Meter.ServiceInstance))
-
- curStream, err := f.meterClient.Collect(ctx)
- if err != nil {
- log.Logger.Errorf("open grpc stream error %v", err)
- return err
- }
- streamMap[streamName] = curStream
- stream = curStream
- f.savePeerInstanceFromStream(curStream, data.Meter.ServiceInstance)
- }
-
- if err := stream.SendMsg(data.Meter); err != nil {
- log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
- return err
- }
- return nil
-}
-
func (f *Forwarder) savePeerInstanceFromStream(stream grpc.ClientStream, instance string) {
upstream := server_grpc.GetPeerAddressFromStreamContext(stream.Context())
if upstream == "" {
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go
index fd3c2f5..8d70c6d 100644
--- a/plugins/receiver/grpc/nativemeter/meter_service.go
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -34,37 +34,36 @@ type MeterService struct {
}
func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) error {
- var service, instance string
+ dataList := make([]*meter.MeterData, 0)
for {
item, err := stream.Recv()
if err == io.EOF {
+ m.flushMeter(dataList)
return stream.SendAndClose(&common.Commands{})
}
if err != nil {
+ m.flushMeter(dataList)
return err
}
- // only first item has service and service instance property
- // need correlate information to each item
- if item.Service != "" {
- service = item.Service
- }
- if item.ServiceInstance != "" {
- instance = item.ServiceInstance
- }
- item.Service = service
- item.ServiceInstance = instance
- d := &v1.SniffData{
- Name: eventName,
- Timestamp: time.Now().UnixNano() / 1e6,
- Meta: nil,
- Type: v1.SniffType_MeterType,
- Remote: true,
- Data: &v1.SniffData_Meter{
- Meter: item,
- },
- }
- m.receiveChannel <- d
+ dataList = append(dataList, item)
+ }
+}
+
+func (m *MeterService) flushMeter(dataList []*meter.MeterData) {
+ if len(dataList) == 0 {
+ return
+ }
+ d := &v1.SniffData{
+ Name: eventName,
+ Timestamp: time.Now().UnixNano() / 1e6,
+ Meta: nil,
+ Type: v1.SniffType_MeterType,
+ Remote: true,
+ Data: &v1.SniffData_MeterCollection{
+ MeterCollection: &meter.MeterDataCollection{MeterData: dataList},
+ },
}
+ m.receiveChannel <- d
}
func (m *MeterService) CollectBatch(batch meter.MeterReportService_CollectBatchServer) error {
diff --git a/plugins/receiver/grpc/nativemeter/receiver_test.go b/plugins/receiver/grpc/nativemeter/receiver_test.go
index ea846ba..f836a34 100644
--- a/plugins/receiver/grpc/nativemeter/receiver_test.go
+++ b/plugins/receiver/grpc/nativemeter/receiver_test.go
@@ -48,7 +48,7 @@ func TestReceiver_RegisterHandler(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetMeter().String()
+ return data.GetMeterCollection().MeterData[0].String()
}, t)
}