You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/30 07:51:53 UTC
[skywalking-satellite] branch main updated: Support the native meter batch protocol (#89)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new c89b750 Support the native meter batch protocol (#89)
c89b750 is described below
commit c89b7500453559c1002c494304ee53cbfe2d8eea
Author: mrproliu <74...@qq.com>
AuthorDate: Tue Nov 30 15:51:48 2021 +0800
Support the native meter batch protocol (#89)
---
CHANGES.md | 2 +
go.mod | 2 +-
go.sum | 4 +-
plugins/forwarder/grpc/nativemeter/forwarder.go | 66 ++++++++++++++++------
plugins/receiver/grpc/nativemeter/meter_service.go | 24 ++++++++
5 files changed, 78 insertions(+), 20 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e5a8b4a..53f6787 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,8 @@ Release Notes.
#### Features
* Support partition queue.
* Using byte array to transmit the ALS streaming, Native tracing segment and log, reducing en/decoding cpu usage.
+* Support using the new ALS protocol to transmit the Envoy accesslog.
+* Support transmit the Native Meter Batch protocol.
#### Bug Fixes
diff --git a/go.mod b/go.mod
index 92baf98..7e48650 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
- skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
+ skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633
)
diff --git a/go.sum b/go.sum
index 28f5755..fdb77ac 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68 h1:DE5enrtUAh/PViRIJCYFLMBjmvPLJVmXEyHJZjBtK90=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633 h1:sneWCKi5BckD4crMDSU4IjFymZjYGsRdHE1O/B9wugA=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativemeter/forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go
index f3c172d..5f1f04b 100644
--- a/plugins/forwarder/grpc/nativemeter/forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
)
const (
@@ -70,7 +71,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
}
func (f *Forwarder) Forward(batch event.BatchEvents) error {
- streamMap := make(map[string]v3.MeterReportService_CollectClient)
+ streamMap := make(map[string]grpc.ClientStream)
defer func() {
for _, stream := range streamMap {
err := closeStream(stream)
@@ -80,34 +81,65 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
}
}()
for _, e := range batch {
- data, ok := e.GetData().(*v1.SniffData_Meter)
- if !ok {
- continue
+ if data, ok := e.GetData().(*v1.SniffData_Meter); ok {
+ if err := f.handleMeter(data, streamMap); err != nil {
+ return err
+ }
}
- streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
- stream := streamMap[streamName]
- if stream == nil {
- curStream, err := f.meterClient.Collect(context.Background())
- if err != nil {
- log.Logger.Errorf("open grpc stream error %v", err)
+ if data, ok := e.GetData().(*v1.SniffData_MeterCollection); ok {
+ if err := f.handleMeterCollection(data, streamMap); err != nil {
return err
}
- streamMap[streamName] = curStream
- stream = curStream
}
+ }
+ return nil
+}
- err := stream.Send(data.Meter)
+func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
+ streamName := "batch-stream"
+ stream := streamMap[streamName]
+ if stream == nil {
+ curStream, err := f.meterClient.CollectBatch(context.Background())
if err != nil {
- log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+ log.Logger.Errorf("open grpc stream error %v", err)
return err
}
+ streamMap[streamName] = curStream
+ stream = curStream
+ }
+
+ if err := stream.SendMsg(data.MeterCollection); err != nil {
+ log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+ return err
}
return nil
}
-func closeStream(stream v3.MeterReportService_CollectClient) error {
- _, err := stream.CloseAndRecv()
- if err != nil && err != io.EOF {
+func (f *Forwarder) handleMeter(data *v1.SniffData_Meter, streamMap map[string]grpc.ClientStream) error {
+ streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
+ stream := streamMap[streamName]
+ if stream == nil {
+ curStream, err := f.meterClient.Collect(context.Background())
+ if err != nil {
+ log.Logger.Errorf("open grpc stream error %v", err)
+ return err
+ }
+ streamMap[streamName] = curStream
+ stream = curStream
+ }
+
+ if err := stream.SendMsg(data.Meter); err != nil {
+ log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+ return err
+ }
+ return nil
+}
+
+func closeStream(stream grpc.ClientStream) error {
+ if err := stream.CloseSend(); err != nil && err != io.EOF {
+ return err
+ }
+ if err := stream.RecvMsg(server_grpc.NewOriginalData(nil)); err != nil {
return err
}
return nil
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go
index 6db8b91..fd3c2f5 100644
--- a/plugins/receiver/grpc/nativemeter/meter_service.go
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -66,3 +66,27 @@ func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) er
m.receiveChannel <- d
}
}
+
+func (m *MeterService) CollectBatch(batch meter.MeterReportService_CollectBatchServer) error {
+ for {
+ item, err := batch.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ d := &v1.SniffData{
+ Name: eventName,
+ Timestamp: time.Now().UnixNano() / 1e6,
+ Meta: nil,
+ Type: v1.SniffType_MeterType,
+ Remote: true,
+ Data: &v1.SniffData_MeterCollection{
+ MeterCollection: item,
+ },
+ }
+ m.receiveChannel <- d
+ }
+}