You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/23 03:43:17 UTC

[skywalking-satellite] branch main updated: Use byte array to transmit the native tracing segment and log (#85)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b97fd2  Use byte array to transmit the native tracing segment and log (#85)
4b97fd2 is described below

commit 4b97fd255b6459fe9d3fc4f88c0d8f682fc2a364
Author: mrproliu <74...@qq.com>
AuthorDate: Tue Nov 23 11:43:09 2021 +0800

    Use byte array to transmit the native tracing segment and log (#85)
---
 CHANGES.md                                         |   2 +-
 go.mod                                             |   2 +-
 go.sum                                             |   4 +-
 plugins/forwarder/grpc/nativelog/forwarder.go      |   3 +-
 plugins/forwarder/grpc/nativetracing/forwarder.go  |   3 +-
 plugins/forwarder/kafka/nativelog/forwarder.go     |  10 +--
 plugins/queue/mmap/queue_test.go                   | 100 +++++++++++----------
 .../receiver/grpc/nativelog/log_report_service.go  |   7 +-
 plugins/receiver/grpc/nativelog/receiver_test.go   |   6 +-
 .../receiver/grpc/nativetracing/receiver_test.go   |  10 ++-
 .../grpc/nativetracing/tracing_report_service.go   |  16 +++-
 plugins/receiver/http/nativcelog/receiver.go       |   2 +-
 plugins/receiver/http/nativcelog/receiver_test.go  |   4 +-
 13 files changed, 97 insertions(+), 72 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 2374a83..e5a8b4a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,7 +6,7 @@ Release Notes.
 ------------------
 #### Features
 * Support partition queue.
-* Using byte array to transmit the ALS streaming, reducing en/decoding cpu usage.
+* Using byte array to transmit the ALS streaming, Native tracing segment and log, reducing en/decoding cpu usage.
 
 #### Bug Fixes
 
diff --git a/go.mod b/go.mod
index fce1276..df3d948 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
 	google.golang.org/protobuf v1.27.1
 	gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
 	gotest.tools v2.2.0+incompatible
-	skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
+	skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
 )
diff --git a/go.sum b/go.sum
index ee7219f..8bff2d3 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
 sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896 h1:Om3XZ0eizMIiQI56K9ukO2T4+xgv3g4tRyFQic1gkBk=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 h1:USC28w3toXoRiNzSCN3lLgnmT8l6RokW7++GiXcNMCU=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
 sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativelog/forwarder.go b/plugins/forwarder/grpc/nativelog/forwarder.go
index 8343f20..9148a6d 100644
--- a/plugins/forwarder/grpc/nativelog/forwarder.go
+++ b/plugins/forwarder/grpc/nativelog/forwarder.go
@@ -31,6 +31,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 )
 
 const (
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		err := stream.Send(data.Log)
+		err := stream.SendMsg(server_grpc.NewOriginalData(data.Log))
 		if err != nil {
 			log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
 			err = closeStream(stream)
diff --git a/plugins/forwarder/grpc/nativetracing/forwarder.go b/plugins/forwarder/grpc/nativetracing/forwarder.go
index 9087a3c..1283b32 100644
--- a/plugins/forwarder/grpc/nativetracing/forwarder.go
+++ b/plugins/forwarder/grpc/nativetracing/forwarder.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
 	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
 
 	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		err := stream.Send(data.Segment)
+		err := stream.SendMsg(server_grpc.NewOriginalData(data.Segment))
 		if err != nil {
 			log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
 			err = closeStream(stream)
diff --git a/plugins/forwarder/kafka/nativelog/forwarder.go b/plugins/forwarder/kafka/nativelog/forwarder.go
index 7e652e2..06d0046 100644
--- a/plugins/forwarder/kafka/nativelog/forwarder.go
+++ b/plugins/forwarder/kafka/nativelog/forwarder.go
@@ -21,12 +21,9 @@ import (
 	"fmt"
 	"reflect"
 
-	"google.golang.org/protobuf/proto"
-
 	"github.com/Shopify/sarama"
 
 	"github.com/apache/skywalking-satellite/internal/pkg/config"
-	"github.com/apache/skywalking-satellite/internal/pkg/log"
 	"github.com/apache/skywalking-satellite/internal/satellite/event"
 
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -83,14 +80,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
 		if !ok {
 			continue
 		}
-		bytes, err := proto.Marshal(data.Log)
-		if err != nil {
-			log.Logger.Errorf("%s serialize the logData fail: %v", f.Name(), err)
-			continue
-		}
 		message = append(message, &sarama.ProducerMessage{
 			Topic: f.Topic,
-			Value: sarama.ByteEncoder(bytes),
+			Value: sarama.ByteEncoder(data.Log),
 		})
 	}
 	return f.producer.SendMessages(message)
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index ff1d2ee..4964421 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -28,6 +28,8 @@ import (
 	"testing"
 	"time"
 
+	"google.golang.org/protobuf/proto"
+
 	common "skywalking.apache.org/repo/goapi/collect/common/v3"
 	logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -68,6 +70,27 @@ func cleanTestQueue(t *testing.T, q api.Queue) {
 func getBatchEvents(count int) []*v1.SniffData {
 	var slice []*v1.SniffData
 	for i := 0; i < count; i++ {
+		log := &logging.LogData{
+			Service:         "mock-service",
+			ServiceInstance: "mock-serviceInstance",
+			Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
+			Endpoint:        "mock-endpoint",
+			Tags:            &logging.LogTags{},
+			TraceContext: &logging.TraceContext{
+				TraceId:        "traceId",
+				TraceSegmentId: "trace-segmentId",
+				SpanId:         12,
+			},
+			Body: &logging.LogDataBody{
+				Type: "body-type",
+				Content: &logging.LogDataBody_Text{
+					Text: &logging.TextLog{
+						Text: getNKData(2) + strconv.Itoa(i),
+					},
+				},
+			},
+		}
+		logBytes, _ := proto.Marshal(log)
 		slice = append(slice, &v1.SniffData{
 			Name:      "event" + strconv.Itoa(i),
 			Timestamp: time.Now().Unix(),
@@ -77,26 +100,7 @@ func getBatchEvents(count int) []*v1.SniffData {
 			Type:   v1.SniffType_Logging,
 			Remote: true,
 			Data: &v1.SniffData_Log{
-				Log: &logging.LogData{
-					Service:         "mock-service",
-					ServiceInstance: "mock-serviceInstance",
-					Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
-					Endpoint:        "mock-endpoint",
-					Tags:            &logging.LogTags{},
-					TraceContext: &logging.TraceContext{
-						TraceId:        "traceId",
-						TraceSegmentId: "trace-segmentId",
-						SpanId:         12,
-					},
-					Body: &logging.LogDataBody{
-						Type: "body-type",
-						Content: &logging.LogDataBody_Text{
-							Text: &logging.TextLog{
-								Text: getNKData(2) + strconv.Itoa(i),
-							},
-						},
-					},
-				},
+				Log: logBytes,
 			},
 		},
 		)
@@ -109,6 +113,34 @@ func getNKData(n int) string {
 }
 
 func getLargeEvent(n int) *v1.SniffData {
+	log := &logging.LogData{
+		Service:         "mock-service",
+		ServiceInstance: "mock-serviceInstance",
+		Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
+		Endpoint:        "mock-endpoint",
+		Tags: &logging.LogTags{
+			Data: []*common.KeyStringValuePair{
+				{
+					Key:   "tags-key",
+					Value: "tags-val",
+				},
+			},
+		},
+		TraceContext: &logging.TraceContext{
+			TraceId:        "traceId",
+			TraceSegmentId: "trace-segmentId",
+			SpanId:         12,
+		},
+		Body: &logging.LogDataBody{
+			Type: "body-type",
+			Content: &logging.LogDataBody_Text{
+				Text: &logging.TextLog{
+					Text: getNKData(n),
+				},
+			},
+		},
+	}
+	logBytes, _ := proto.Marshal(log)
 	return &v1.SniffData{
 		Name:      "largeEvent",
 		Timestamp: time.Now().Unix(),
@@ -118,33 +150,7 @@ func getLargeEvent(n int) *v1.SniffData {
 		Type:   v1.SniffType_Logging,
 		Remote: true,
 		Data: &v1.SniffData_Log{
-			Log: &logging.LogData{
-				Service:         "mock-service",
-				ServiceInstance: "mock-serviceInstance",
-				Timestamp:       time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
-				Endpoint:        "mock-endpoint",
-				Tags: &logging.LogTags{
-					Data: []*common.KeyStringValuePair{
-						{
-							Key:   "tags-key",
-							Value: "tags-val",
-						},
-					},
-				},
-				TraceContext: &logging.TraceContext{
-					TraceId:        "traceId",
-					TraceSegmentId: "trace-segmentId",
-					SpanId:         12,
-				},
-				Body: &logging.LogDataBody{
-					Type: "body-type",
-					Content: &logging.LogDataBody_Text{
-						Text: &logging.TextLog{
-							Text: getNKData(n),
-						},
-					},
-				},
-			},
+			Log: logBytes,
 		},
 	}
 }
diff --git a/plugins/receiver/grpc/nativelog/log_report_service.go b/plugins/receiver/grpc/nativelog/log_report_service.go
index a291956..6670f81 100644
--- a/plugins/receiver/grpc/nativelog/log_report_service.go
+++ b/plugins/receiver/grpc/nativelog/log_report_service.go
@@ -21,6 +21,8 @@ import (
 	"io"
 	"time"
 
+	"github.com/apache/skywalking-satellite/plugins/server/grpc"
+
 	common "skywalking.apache.org/repo/goapi/collect/common/v3"
 	logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -35,7 +37,8 @@ type LogReportService struct {
 
 func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer) error {
 	for {
-		logData, err := stream.Recv()
+		originalData := grpc.NewOriginalData(nil)
+		err := stream.RecvMsg(originalData)
 		if err == io.EOF {
 			return stream.SendAndClose(&common.Commands{})
 		}
@@ -49,7 +52,7 @@ func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer
 			Type:      v1.SniffType_Logging,
 			Remote:    true,
 			Data: &v1.SniffData_Log{
-				Log: logData,
+				Log: originalData.Content,
 			},
 		}
 		s.receiveChannel <- e
diff --git a/plugins/receiver/grpc/nativelog/receiver_test.go b/plugins/receiver/grpc/nativelog/receiver_test.go
index 0f3734b..97241e8 100644
--- a/plugins/receiver/grpc/nativelog/receiver_test.go
+++ b/plugins/receiver/grpc/nativelog/receiver_test.go
@@ -23,6 +23,8 @@ import (
 	"testing"
 	"time"
 
+	"google.golang.org/protobuf/proto"
+
 	"google.golang.org/grpc"
 
 	common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
 		}
 		return data.String()
 	}, func(data *v1.SniffData) string {
-		return data.GetLog().String()
+		d := new(logging.LogData)
+		_ = proto.Unmarshal(data.GetLog(), d)
+		return d.String()
 	}, t)
 }
 
diff --git a/plugins/receiver/grpc/nativetracing/receiver_test.go b/plugins/receiver/grpc/nativetracing/receiver_test.go
index 97514be..227bbcd 100644
--- a/plugins/receiver/grpc/nativetracing/receiver_test.go
+++ b/plugins/receiver/grpc/nativetracing/receiver_test.go
@@ -23,6 +23,8 @@ import (
 	"testing"
 	"time"
 
+	"google.golang.org/protobuf/proto"
+
 	"google.golang.org/grpc"
 
 	common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandlerStream(t *testing.T) {
 		}
 		return data.String()
 	}, func(data *v1.SniffData) string {
-		return data.GetSegment().String()
+		d := new(agent.SegmentObject)
+		_ = proto.Unmarshal(data.GetSegment(), d)
+		return d.String()
 	}, t)
 }
 
@@ -68,7 +72,9 @@ func TestReceiver_RegisterHandlerSync(t *testing.T) {
 		}
 		return data.String()
 	}, func(data *v1.SniffData) string {
-		return data.GetSegment().String()
+		d := new(agent.SegmentObject)
+		_ = proto.Unmarshal(data.GetSegment(), d)
+		return d.String()
 	}, t)
 }
 
diff --git a/plugins/receiver/grpc/nativetracing/tracing_report_service.go b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
index 2a58f5f..9929fd8 100644
--- a/plugins/receiver/grpc/nativetracing/tracing_report_service.go
+++ b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
@@ -22,6 +22,11 @@ import (
 	"io"
 	"time"
 
+	"google.golang.org/protobuf/proto"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/plugins/server/grpc"
+
 	common "skywalking.apache.org/repo/goapi/collect/common/v3"
 	agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
 	v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -36,7 +41,8 @@ type TraceSegmentReportService struct {
 
 func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error {
 	for {
-		segmentData, err := stream.Recv()
+		recData := grpc.NewOriginalData(nil)
+		err := stream.RecvMsg(recData)
 		if err == io.EOF {
 			return stream.SendAndClose(&common.Commands{})
 		}
@@ -50,7 +56,7 @@ func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportServi
 			Type:      v1.SniffType_TracingType,
 			Remote:    true,
 			Data: &v1.SniffData_Segment{
-				Segment: segmentData,
+				Segment: recData.Content,
 			},
 		}
 		s.receiveChannel <- e
@@ -59,6 +65,10 @@ func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportServi
 
 func (s *TraceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) {
 	for _, segment := range segments.Segments {
+		marshaledSegment, err := proto.Marshal(segment)
+		if err != nil {
+			log.Logger.Warnf("cannot marshal segemnt from sync, %v", err)
+		}
 		e := &v1.SniffData{
 			Name:      eventName,
 			Timestamp: time.Now().UnixNano() / 1e6,
@@ -66,7 +76,7 @@ func (s *TraceSegmentReportService) CollectInSync(ctx context.Context, segments
 			Type:      v1.SniffType_TracingType,
 			Remote:    true,
 			Data: &v1.SniffData_Segment{
-				Segment: segment,
+				Segment: marshaledSegment,
 			},
 		}
 		s.receiveChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver.go b/plugins/receiver/http/nativcelog/receiver.go
index e30949a..cf1d460 100644
--- a/plugins/receiver/http/nativcelog/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -121,7 +121,7 @@ func (r *Receiver) httpHandler() http.Handler {
 			Type:      v1.SniffType_Logging,
 			Remote:    true,
 			Data: &v1.SniffData_Log{
-				Log: &data,
+				Log: b,
 			},
 		}
 		r.OutputChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go b/plugins/receiver/http/nativcelog/receiver_test.go
index 2d308cc..5cfb1cf 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -83,7 +83,9 @@ func TestReceiver_http_RegisterHandler(t *testing.T) {
 		}()
 
 		newData := <-r.Channel()
-		if !cmp.Equal(newData.Data.(*v1.SniffData_Log).Log.String(), data.String()) {
+		d := new(logging.LogData)
+		_ = proto.Unmarshal(newData.Data.(*v1.SniffData_Log).Log, d)
+		if !cmp.Equal(d.String(), data.String()) {
 			t.Fatalf("the sent data is not equal to the received data\n, "+
 				"want data %s\n, but got %s\n", data.String(), newData.String())
 		}