You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/11/23 03:43:17 UTC
[skywalking-satellite] branch main updated: Use byte array to transmit the native tracing segment and log (#85)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git
The following commit(s) were added to refs/heads/main by this push:
new 4b97fd2 Use byte array to transmit the native tracing segment and log (#85)
4b97fd2 is described below
commit 4b97fd255b6459fe9d3fc4f88c0d8f682fc2a364
Author: mrproliu <74...@qq.com>
AuthorDate: Tue Nov 23 11:43:09 2021 +0800
Use byte array to transmit the native tracing segment and log (#85)
---
CHANGES.md | 2 +-
go.mod | 2 +-
go.sum | 4 +-
plugins/forwarder/grpc/nativelog/forwarder.go | 3 +-
plugins/forwarder/grpc/nativetracing/forwarder.go | 3 +-
plugins/forwarder/kafka/nativelog/forwarder.go | 10 +--
plugins/queue/mmap/queue_test.go | 100 +++++++++++----------
.../receiver/grpc/nativelog/log_report_service.go | 7 +-
plugins/receiver/grpc/nativelog/receiver_test.go | 6 +-
.../receiver/grpc/nativetracing/receiver_test.go | 10 ++-
.../grpc/nativetracing/tracing_report_service.go | 16 +++-
plugins/receiver/http/nativcelog/receiver.go | 2 +-
plugins/receiver/http/nativcelog/receiver_test.go | 4 +-
13 files changed, 97 insertions(+), 72 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2374a83..e5a8b4a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,7 +6,7 @@ Release Notes.
------------------
#### Features
* Support partition queue.
-* Using byte array to transmit the ALS streaming, reducing en/decoding cpu usage.
+* Using byte array to transmit the ALS streaming, Native tracing segment and log, reducing en/decoding cpu usage.
#### Bug Fixes
diff --git a/go.mod b/go.mod
index fce1276..df3d948 100644
--- a/go.mod
+++ b/go.mod
@@ -23,5 +23,5 @@ require (
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
- skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896
+ skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21
)
diff --git a/go.sum b/go.sum
index ee7219f..8bff2d3 100644
--- a/go.sum
+++ b/go.sum
@@ -1330,6 +1330,6 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896 h1:Om3XZ0eizMIiQI56K9ukO2T4+xgv3g4tRyFQic1gkBk=
-skywalking.apache.org/repo/goapi v0.0.0-20211119053430-01f9654d8896/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21 h1:USC28w3toXoRiNzSCN3lLgnmT8l6RokW7++GiXcNMCU=
+skywalking.apache.org/repo/goapi v0.0.0-20211122071111-ffc517fbfe21/go.mod h1:4KrWd+Oi4lkB+PtxZgIlf+3T6EECPru4fOWNMEHjxRk=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
diff --git a/plugins/forwarder/grpc/nativelog/forwarder.go b/plugins/forwarder/grpc/nativelog/forwarder.go
index 8343f20..9148a6d 100644
--- a/plugins/forwarder/grpc/nativelog/forwarder.go
+++ b/plugins/forwarder/grpc/nativelog/forwarder.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
)
const (
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
if !ok {
continue
}
- err := stream.Send(data.Log)
+ err := stream.SendMsg(server_grpc.NewOriginalData(data.Log))
if err != nil {
log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
err = closeStream(stream)
diff --git a/plugins/forwarder/grpc/nativetracing/forwarder.go b/plugins/forwarder/grpc/nativetracing/forwarder.go
index 9087a3c..1283b32 100644
--- a/plugins/forwarder/grpc/nativetracing/forwarder.go
+++ b/plugins/forwarder/grpc/nativetracing/forwarder.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-satellite/internal/pkg/config"
"github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
+ server_grpc "github.com/apache/skywalking-satellite/plugins/server/grpc"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -81,7 +82,7 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
if !ok {
continue
}
- err := stream.Send(data.Segment)
+ err := stream.SendMsg(server_grpc.NewOriginalData(data.Segment))
if err != nil {
log.Logger.Errorf("%s send log data error: %v", f.Name(), err)
err = closeStream(stream)
diff --git a/plugins/forwarder/kafka/nativelog/forwarder.go b/plugins/forwarder/kafka/nativelog/forwarder.go
index 7e652e2..06d0046 100644
--- a/plugins/forwarder/kafka/nativelog/forwarder.go
+++ b/plugins/forwarder/kafka/nativelog/forwarder.go
@@ -21,12 +21,9 @@ import (
"fmt"
"reflect"
- "google.golang.org/protobuf/proto"
-
"github.com/Shopify/sarama"
"github.com/apache/skywalking-satellite/internal/pkg/config"
- "github.com/apache/skywalking-satellite/internal/pkg/log"
"github.com/apache/skywalking-satellite/internal/satellite/event"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -83,14 +80,9 @@ func (f *Forwarder) Forward(batch event.BatchEvents) error {
if !ok {
continue
}
- bytes, err := proto.Marshal(data.Log)
- if err != nil {
- log.Logger.Errorf("%s serialize the logData fail: %v", f.Name(), err)
- continue
- }
message = append(message, &sarama.ProducerMessage{
Topic: f.Topic,
- Value: sarama.ByteEncoder(bytes),
+ Value: sarama.ByteEncoder(data.Log),
})
}
return f.producer.SendMessages(message)
diff --git a/plugins/queue/mmap/queue_test.go b/plugins/queue/mmap/queue_test.go
index ff1d2ee..4964421 100644
--- a/plugins/queue/mmap/queue_test.go
+++ b/plugins/queue/mmap/queue_test.go
@@ -28,6 +28,8 @@ import (
"testing"
"time"
+ "google.golang.org/protobuf/proto"
+
common "skywalking.apache.org/repo/goapi/collect/common/v3"
logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -68,6 +70,27 @@ func cleanTestQueue(t *testing.T, q api.Queue) {
func getBatchEvents(count int) []*v1.SniffData {
var slice []*v1.SniffData
for i := 0; i < count; i++ {
+ log := &logging.LogData{
+ Service: "mock-service",
+ ServiceInstance: "mock-serviceInstance",
+ Timestamp: time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
+ Endpoint: "mock-endpoint",
+ Tags: &logging.LogTags{},
+ TraceContext: &logging.TraceContext{
+ TraceId: "traceId",
+ TraceSegmentId: "trace-segmentId",
+ SpanId: 12,
+ },
+ Body: &logging.LogDataBody{
+ Type: "body-type",
+ Content: &logging.LogDataBody_Text{
+ Text: &logging.TextLog{
+ Text: getNKData(2) + strconv.Itoa(i),
+ },
+ },
+ },
+ }
+ logBytes, _ := proto.Marshal(log)
slice = append(slice, &v1.SniffData{
Name: "event" + strconv.Itoa(i),
Timestamp: time.Now().Unix(),
@@ -77,26 +100,7 @@ func getBatchEvents(count int) []*v1.SniffData {
Type: v1.SniffType_Logging,
Remote: true,
Data: &v1.SniffData_Log{
- Log: &logging.LogData{
- Service: "mock-service",
- ServiceInstance: "mock-serviceInstance",
- Timestamp: time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
- Endpoint: "mock-endpoint",
- Tags: &logging.LogTags{},
- TraceContext: &logging.TraceContext{
- TraceId: "traceId",
- TraceSegmentId: "trace-segmentId",
- SpanId: 12,
- },
- Body: &logging.LogDataBody{
- Type: "body-type",
- Content: &logging.LogDataBody_Text{
- Text: &logging.TextLog{
- Text: getNKData(2) + strconv.Itoa(i),
- },
- },
- },
- },
+ Log: logBytes,
},
},
)
@@ -109,6 +113,34 @@ func getNKData(n int) string {
}
func getLargeEvent(n int) *v1.SniffData {
+ log := &logging.LogData{
+ Service: "mock-service",
+ ServiceInstance: "mock-serviceInstance",
+ Timestamp: time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
+ Endpoint: "mock-endpoint",
+ Tags: &logging.LogTags{
+ Data: []*common.KeyStringValuePair{
+ {
+ Key: "tags-key",
+ Value: "tags-val",
+ },
+ },
+ },
+ TraceContext: &logging.TraceContext{
+ TraceId: "traceId",
+ TraceSegmentId: "trace-segmentId",
+ SpanId: 12,
+ },
+ Body: &logging.LogDataBody{
+ Type: "body-type",
+ Content: &logging.LogDataBody_Text{
+ Text: &logging.TextLog{
+ Text: getNKData(n),
+ },
+ },
+ },
+ }
+ logBytes, _ := proto.Marshal(log)
return &v1.SniffData{
Name: "largeEvent",
Timestamp: time.Now().Unix(),
@@ -118,33 +150,7 @@ func getLargeEvent(n int) *v1.SniffData {
Type: v1.SniffType_Logging,
Remote: true,
Data: &v1.SniffData_Log{
- Log: &logging.LogData{
- Service: "mock-service",
- ServiceInstance: "mock-serviceInstance",
- Timestamp: time.Date(2020, 12, 20, 12, 12, 12, 0, time.UTC).Unix(),
- Endpoint: "mock-endpoint",
- Tags: &logging.LogTags{
- Data: []*common.KeyStringValuePair{
- {
- Key: "tags-key",
- Value: "tags-val",
- },
- },
- },
- TraceContext: &logging.TraceContext{
- TraceId: "traceId",
- TraceSegmentId: "trace-segmentId",
- SpanId: 12,
- },
- Body: &logging.LogDataBody{
- Type: "body-type",
- Content: &logging.LogDataBody_Text{
- Text: &logging.TextLog{
- Text: getNKData(n),
- },
- },
- },
- },
+ Log: logBytes,
},
}
}
diff --git a/plugins/receiver/grpc/nativelog/log_report_service.go b/plugins/receiver/grpc/nativelog/log_report_service.go
index a291956..6670f81 100644
--- a/plugins/receiver/grpc/nativelog/log_report_service.go
+++ b/plugins/receiver/grpc/nativelog/log_report_service.go
@@ -21,6 +21,8 @@ import (
"io"
"time"
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
common "skywalking.apache.org/repo/goapi/collect/common/v3"
logging "skywalking.apache.org/repo/goapi/collect/logging/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -35,7 +37,8 @@ type LogReportService struct {
func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer) error {
for {
- logData, err := stream.Recv()
+ originalData := grpc.NewOriginalData(nil)
+ err := stream.RecvMsg(originalData)
if err == io.EOF {
return stream.SendAndClose(&common.Commands{})
}
@@ -49,7 +52,7 @@ func (s *LogReportService) Collect(stream logging.LogReportService_CollectServer
Type: v1.SniffType_Logging,
Remote: true,
Data: &v1.SniffData_Log{
- Log: logData,
+ Log: originalData.Content,
},
}
s.receiveChannel <- e
diff --git a/plugins/receiver/grpc/nativelog/receiver_test.go b/plugins/receiver/grpc/nativelog/receiver_test.go
index 0f3734b..97241e8 100644
--- a/plugins/receiver/grpc/nativelog/receiver_test.go
+++ b/plugins/receiver/grpc/nativelog/receiver_test.go
@@ -23,6 +23,8 @@ import (
"testing"
"time"
+ "google.golang.org/protobuf/proto"
+
"google.golang.org/grpc"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandler(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetLog().String()
+ d := new(logging.LogData)
+ _ = proto.Unmarshal(data.GetLog(), d)
+ return d.String()
}, t)
}
diff --git a/plugins/receiver/grpc/nativetracing/receiver_test.go b/plugins/receiver/grpc/nativetracing/receiver_test.go
index 97514be..227bbcd 100644
--- a/plugins/receiver/grpc/nativetracing/receiver_test.go
+++ b/plugins/receiver/grpc/nativetracing/receiver_test.go
@@ -23,6 +23,8 @@ import (
"testing"
"time"
+ "google.golang.org/protobuf/proto"
+
"google.golang.org/grpc"
common "skywalking.apache.org/repo/goapi/collect/common/v3"
@@ -49,7 +51,9 @@ func TestReceiver_RegisterHandlerStream(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetSegment().String()
+ d := new(agent.SegmentObject)
+ _ = proto.Unmarshal(data.GetSegment(), d)
+ return d.String()
}, t)
}
@@ -68,7 +72,9 @@ func TestReceiver_RegisterHandlerSync(t *testing.T) {
}
return data.String()
}, func(data *v1.SniffData) string {
- return data.GetSegment().String()
+ d := new(agent.SegmentObject)
+ _ = proto.Unmarshal(data.GetSegment(), d)
+ return d.String()
}, t)
}
diff --git a/plugins/receiver/grpc/nativetracing/tracing_report_service.go b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
index 2a58f5f..9929fd8 100644
--- a/plugins/receiver/grpc/nativetracing/tracing_report_service.go
+++ b/plugins/receiver/grpc/nativetracing/tracing_report_service.go
@@ -22,6 +22,11 @@ import (
"io"
"time"
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-satellite/internal/pkg/log"
+ "github.com/apache/skywalking-satellite/plugins/server/grpc"
+
common "skywalking.apache.org/repo/goapi/collect/common/v3"
agent "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
v1 "skywalking.apache.org/repo/goapi/satellite/data/v1"
@@ -36,7 +41,8 @@ type TraceSegmentReportService struct {
func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportService_CollectServer) error {
for {
- segmentData, err := stream.Recv()
+ recData := grpc.NewOriginalData(nil)
+ err := stream.RecvMsg(recData)
if err == io.EOF {
return stream.SendAndClose(&common.Commands{})
}
@@ -50,7 +56,7 @@ func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportServi
Type: v1.SniffType_TracingType,
Remote: true,
Data: &v1.SniffData_Segment{
- Segment: segmentData,
+ Segment: recData.Content,
},
}
s.receiveChannel <- e
@@ -59,6 +65,10 @@ func (s *TraceSegmentReportService) Collect(stream agent.TraceSegmentReportServi
func (s *TraceSegmentReportService) CollectInSync(ctx context.Context, segments *agent.SegmentCollection) (*common.Commands, error) {
for _, segment := range segments.Segments {
+ marshaledSegment, err := proto.Marshal(segment)
+ if err != nil {
+ log.Logger.Warnf("cannot marshal segemnt from sync, %v", err)
+ }
e := &v1.SniffData{
Name: eventName,
Timestamp: time.Now().UnixNano() / 1e6,
@@ -66,7 +76,7 @@ func (s *TraceSegmentReportService) CollectInSync(ctx context.Context, segments
Type: v1.SniffType_TracingType,
Remote: true,
Data: &v1.SniffData_Segment{
- Segment: segment,
+ Segment: marshaledSegment,
},
}
s.receiveChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver.go b/plugins/receiver/http/nativcelog/receiver.go
index e30949a..cf1d460 100644
--- a/plugins/receiver/http/nativcelog/receiver.go
+++ b/plugins/receiver/http/nativcelog/receiver.go
@@ -121,7 +121,7 @@ func (r *Receiver) httpHandler() http.Handler {
Type: v1.SniffType_Logging,
Remote: true,
Data: &v1.SniffData_Log{
- Log: &data,
+ Log: b,
},
}
r.OutputChannel <- e
diff --git a/plugins/receiver/http/nativcelog/receiver_test.go b/plugins/receiver/http/nativcelog/receiver_test.go
index 2d308cc..5cfb1cf 100644
--- a/plugins/receiver/http/nativcelog/receiver_test.go
+++ b/plugins/receiver/http/nativcelog/receiver_test.go
@@ -83,7 +83,9 @@ func TestReceiver_http_RegisterHandler(t *testing.T) {
}()
newData := <-r.Channel()
- if !cmp.Equal(newData.Data.(*v1.SniffData_Log).Log.String(), data.String()) {
+ d := new(logging.LogData)
+ _ = proto.Unmarshal(newData.Data.(*v1.SniffData_Log).Log, d)
+ if !cmp.Equal(d.String(), data.String()) {
t.Fatalf("the sent data is not equal to the received data\n, "+
"want data %s\n, but got %s\n", data.String(), newData.String())
}