You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/30 07:51:53 UTC

[skywalking-satellite] branch main updated: Support the native meter batch protocol (#89)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new c89b750  Support the native meter batch protocol (#89)
c89b750 is described below

commit c89b7500453559c1002c494304ee53cbfe2d8eea
Author: mrproliu <74...@qq.com>
AuthorDate: Tue Nov 30 15:51:48 2021 +0800

    Support the native meter batch protocol (#89)
---
 CHANGES.md                                         |  2 +
 go.mod                                             |  2 +-
 go.sum                                             |  4 +-
 plugins/forwarder/grpc/nativemeter/forwarder.go    | 66 ++++++++++++++++------
 plugins/receiver/grpc/nativemeter/meter_service.go | 24 ++++++++
 5 files changed, 78 insertions(+), 20 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e5a8b4a..53f6787 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,8 @@ Release Notes.
 #### Features
 * Support partition queue.
 * Using byte array to transmit the ALS streaming, Native tracing segment and log, reducing en/decoding cpu usage.
+* Support using the new ALS protocol to transmit the Envoy accesslog.
+* Support transmit the Native Meter Batch protocol.
 
 #### Bug Fixes
 
diff --git a/go.mod b/go.mod
index 92baf98..7e48650 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 	gotest.tools v2.2.0+incompatible
-	skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68
+	skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633
 )
diff --git a/go.sum b/go.sum
index 28f5755..fdb77ac 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68 h1:DE5enrtUAh/PViRIJCYFLMBjmvPLJVmXEyHJZjBtK90=
-skywalking.apache.org/repo/goapi v0.0.0-20211129152714-f5760201da68/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633 h1:sneWCKi5BckD4crMDSU4IjFymZjYGsRdHE1O/B9wugA=
+skywalking.apache.org/repo/goapi v0.0.0-20211130045256-aee6db90e633/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativemeter/forwarder.go b/plugins/forwarder/grpc/nativemeter/forwarder.go
index f3c172d..5f1f04b 100644
--- a/plugins/forwarder/grpc/nativemeter/forwarder.go
+++ b/plugins/forwarder/grpc/nativemeter/forwarder.go
@@ -31,6 +31,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -70,7 +71,7 @@ func (f *Forwarder) Prepare(connection interface{}) error {
 }
 
 func (f *Forwarder) Forward(batch event.BatchEvents) error {
-	streamMap := make(map[string]v3.MeterReportService_CollectClient)
+	streamMap := make(map[string]grpc.ClientStream)
 	defer func() {
 		for _, stream := range streamMap {
 			err := closeStream(stream)
@@ -80,34 +81,65 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		}
 	}()
 	for _, e := range batch {
-		data, ok := e.GetData().(*v1.SniffData_Meter)
-		if !ok {
-			continue
+		if data, ok := e.GetData().(*v1.SniffData_Meter); ok {
+			if err := f.handleMeter(data, streamMap); err != nil {
+				return err
+			}
 		}
-		streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
-		stream := streamMap[streamName]
-		if stream == nil {
-			curStream, err := f.meterClient.Collect(context.Background())
-			if err != nil {
-				log.Logger.Errorf("open grpc stream error %v", err)
+		if data, ok := e.GetData().(*v1.SniffData_MeterCollection); ok {
+			if err := f.handleMeterCollection(data, streamMap); err != nil {
 				return err
 			}
-			streamMap[streamName] = curStream
-			stream = curStream
 		}
+	}
+	return nil
+}
 
-		err := stream.Send(data.Meter)
+func (f *Forwarder) handleMeterCollection(data *v1.SniffData_MeterCollection, streamMap map[string]grpc.ClientStream) error {
+	streamName := "batch-stream"
+	stream := streamMap[streamName]
+	if stream == nil {
+		curStream, err := f.meterClient.CollectBatch(context.Background())
 		if err != nil {
-			log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+			log.Logger.Errorf("open grpc stream error %v", err)
 			return err
 		}
+		streamMap[streamName] = curStream
+		stream = curStream
+	}
+
+	if err := stream.SendMsg(data.MeterCollection); err != nil {
+		log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+		return err
 	}
 	return nil
 }
 
-func closeStream(stream v3.MeterReportService_CollectClient) error {
-	_, err := stream.CloseAndRecv()
-	if err != nil && err != io.EOF {
+func (f *Forwarder) handleMeter(data *v1.SniffData_Meter, streamMap map[string]grpc.ClientStream) error {
+	streamName := fmt.Sprintf("%s_%s", data.Meter.Service, data.Meter.ServiceInstance)
+	stream := streamMap[streamName]
+	if stream == nil {
+		curStream, err := f.meterClient.Collect(context.Background())
+		if err != nil {
+			log.Logger.Errorf("open grpc stream error %v", err)
+			return err
+		}
+		streamMap[streamName] = curStream
+		stream = curStream
+	}
+
+	if err := stream.SendMsg(data.Meter); err != nil {
+		log.Logger.Errorf("%s send meter data error: %v", f.Name(), err)
+		return err
+	}
+	return nil
+}
+
+func closeStream(stream grpc.ClientStream) error {
+	if err := stream.CloseSend(); err != nil && err != io.EOF {
+		return err
+	}
+	if err := stream.RecvMsg(server_grpc.NewOriginalData(nil)); err != nil {
 		return err
 	}
 	return nil
diff --git a/plugins/receiver/grpc/nativemeter/meter_service.go b/plugins/receiver/grpc/nativemeter/meter_service.go
index 6db8b91..fd3c2f5 100644
--- a/plugins/receiver/grpc/nativemeter/meter_service.go
+++ b/plugins/receiver/grpc/nativemeter/meter_service.go
@@ -66,3 +66,27 @@ func (m *MeterService) Collect(stream meter.MeterReportService_CollectServer) er
 		m.receiveChannel <- d
 	}
 }
+
+func (m *MeterService) CollectBatch(batch meter.MeterReportService_CollectBatchServer) error {
+	for {
+		item, err := batch.Recv()
+		if err == io.EOF {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+
+		d := &v1.SniffData{
+			Name:      eventName,
+			Timestamp: time.Now().UnixNano() / 1e6,
+			Meta:      nil,
+			Type:      v1.SniffType_MeterType,
+			Remote:    true,
+			Data: &v1.SniffData_MeterCollection{
+				MeterCollection: item,
+			},
+		}
+		m.receiveChannel <- d
+	}
+}