You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2022/11/24 04:49:50 UTC
[skywalking-rover] branch main updated: Support configuable sampling config (#59)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 98901e8 Support configuable sampling config (#59)
98901e8 is described below
commit 98901e8fbcab4478dc79c1eb4c32cdbfec36f630
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Nov 24 12:49:46 2022 +0800
Support configuable sampling config (#59)
---
docs/en/setup/configuration/profiling.md | 16 +-
pkg/profiling/task/base/task.go | 47 ++++-
pkg/profiling/task/network/analyze/base/context.go | 6 +
.../task/network/analyze/base/listener.go | 3 +
.../task/network/analyze/layer4/listener.go | 3 +
.../task/network/analyze/layer7/events.go | 11 ++
.../task/network/analyze/layer7/listener.go | 4 +
.../analyze/layer7/protocols/base/protocol.go | 6 +-
.../analyze/layer7/protocols/http1/analyzer.go | 17 +-
.../analyze/layer7/protocols/http1/metrics.go | 150 +++++++++-------
.../analyze/layer7/protocols/http1/sampling.go | 199 +++++++++++++++++++++
.../network/analyze/layer7/protocols/protocols.go | 7 +
pkg/profiling/task/network/delegate.go | 4 +-
pkg/profiling/task/network/runner.go | 9 +-
test/e2e/base/env | 4 +-
test/e2e/cases/profiling/network/base-cases.yaml | 2 +-
test/e2e/cases/profiling/network/envoy/e2e.yaml | 1 +
.../network/expected/status-4xx-traces.yml} | 11 +-
.../network/expected/status-5xx-traces.yml} | 11 +-
test/e2e/cases/profiling/network/golang/e2e.yaml | 2 +-
test/e2e/cases/profiling/network/golang/service.go | 13 +-
...-cases.yaml => http1-sampled-traces-cases.yaml} | 22 ++-
test/e2e/cases/profiling/network/sampling.yaml | 28 +++
23 files changed, 471 insertions(+), 105 deletions(-)
diff --git a/docs/en/setup/configuration/profiling.md b/docs/en/setup/configuration/profiling.md
index 7f2c35d..95bae91 100644
--- a/docs/en/setup/configuration/profiling.md
+++ b/docs/en/setup/configuration/profiling.md
@@ -104,12 +104,14 @@ Based on the above two data types, the following metrics are provided.
##### Logs
-| Name | Type | Unit | Description |
-|--------------|-------|-------------|----------------------------|
-| slow_traces | TopN | millisecond | The Top N slow trace(id)s |
+| Name | Type | Unit | Description |
+|-------------|-------|-------------|------------------------------------------------------|
+| slow_traces | TopN | millisecond | The Top N slow trace(id)s |
+| status_4xx | TopN | millisecond | The Top N trace(id)s with response status in 400-499 |
+| status_5xx | TopN | millisecond | The Top N trace(id)s with response status in 500-599 |
##### Span Attached Event
-| Name | Description |
-|--------------------|-----------------------------------------------------------------------------------------------|
-| http-full-request | Complete information about the HTTP request, it's only reported when it matches slow traces. |
-| http-full-response | Complete information about the HTTP response, it's only reported when it matches slow traces. |
+| Name | Description |
+|------------------------|-----------------------------------------------------------------------------------------------|
+| HTTP Request Sampling | Complete information about the HTTP request, it's only reported when it matches slow traces. |
+| HTTP Response Sampling | Complete information about the HTTP response, it's only reported when it matches slow traces. |
diff --git a/pkg/profiling/task/base/task.go b/pkg/profiling/task/base/task.go
index 663cb1c..725c77d 100644
--- a/pkg/profiling/task/base/task.go
+++ b/pkg/profiling/task/base/task.go
@@ -18,6 +18,7 @@
package base
import (
+ "encoding/json"
"fmt"
"strconv"
"strings"
@@ -41,6 +42,7 @@ type ProfilingTask struct {
TargetType TargetType
// MaxRunningDuration of task
MaxRunningDuration time.Duration
+ ExtensionConfig *ExtensionConfig
}
func ProfilingTaskFromCommand(command *v3.Command) (*ProfilingTask, error) {
@@ -57,6 +59,7 @@ func ProfilingTaskFromCommand(command *v3.Command) (*ProfilingTask, error) {
targetTypeStr, err := getCommandStringValue(err, command, "TargetType")
targetType, err := ParseTargetType(err, targetTypeStr)
taskStartTime, err := getCommandIntValue(err, command, "TaskStartTime")
+ extensionConfig, err := getCommandExtensionConfig(err, command, "ExtensionConfigJSON")
if err != nil {
return nil, err
}
@@ -64,12 +67,13 @@ func ProfilingTaskFromCommand(command *v3.Command) (*ProfilingTask, error) {
processes := strings.Split(processIDList, ",")
task := &ProfilingTask{
- TaskID: taskID,
- ProcessIDList: processes,
- UpdateTime: taskUpdateTime,
- StartTime: taskStartTime,
- TargetType: targetType,
- TriggerType: triggerType,
+ TaskID: taskID,
+ ProcessIDList: processes,
+ UpdateTime: taskUpdateTime,
+ StartTime: taskStartTime,
+ TargetType: targetType,
+ TriggerType: triggerType,
+ ExtensionConfig: extensionConfig,
}
if err := task.TriggerType.InitTask(task, command); err != nil {
@@ -82,6 +86,25 @@ func ProfilingTaskFromCommand(command *v3.Command) (*ProfilingTask, error) {
return task, nil
}
+type ExtensionConfig struct {
+ NetworkSamplings []*NetworkSamplingRule `json:"NetworkSamplings"`
+}
+
+type NetworkSamplingRule struct {
+ URIRegex *string `json:"URIRegex"`
+ MinDuration int32 `json:"MinDuration"`
+ When4XX bool `json:"When4xx"`
+ When5XX bool `json:"When5xx"`
+ Settings *NetworkDataCollectingSettings `json:"Settings"`
+}
+
+type NetworkDataCollectingSettings struct {
+ RequireCompleteRequest bool `json:"RequireCompleteRequest"`
+ MaxRequestSize int32 `json:"MaxRequestSize"`
+ RequireCompleteResponse bool `json:"RequireCompleteResponse"`
+ MaxResponseSize int32 `json:"MaxResponseSize"`
+}
+
func getCommandStringValue(err error, command *v3.Command, key string) (string, error) {
if err != nil {
return "", err
@@ -101,3 +124,15 @@ func getCommandIntValue(err error, command *v3.Command, key string) (int64, erro
}
return strconv.ParseInt(val, 10, 64)
}
+
+func getCommandExtensionConfig(err error, command *v3.Command, key string) (*ExtensionConfig, error) {
+ val, err := getCommandStringValue(err, command, key)
+ if err != nil {
+ return nil, err
+ }
+ config := &ExtensionConfig{}
+ if e := json.Unmarshal([]byte(val), config); e != nil {
+ return nil, e
+ }
+ return config, nil
+}
diff --git a/pkg/profiling/task/network/analyze/base/context.go b/pkg/profiling/task/network/analyze/base/context.go
index 3c4ad56..7a0aecf 100644
--- a/pkg/profiling/task/network/analyze/base/context.go
+++ b/pkg/profiling/task/network/analyze/base/context.go
@@ -118,6 +118,12 @@ func (c *AnalyzerContext) GetActiveConnection(conID, randomID uint64) *Connectio
return data.(*ConnectionContext)
}
+func (c *AnalyzerContext) UpdateExtensionConfig(config *base.ExtensionConfig) {
+ for _, l := range c.listeners {
+ l.UpdateExtensionConfig(config)
+ }
+}
+
func (c *AnalyzerContext) handleSocketParseQueue(ctx context.Context) {
for {
select {
diff --git a/pkg/profiling/task/network/analyze/base/listener.go b/pkg/profiling/task/network/analyze/base/listener.go
index cd0b7da..8d669a3 100644
--- a/pkg/profiling/task/network/analyze/base/listener.go
+++ b/pkg/profiling/task/network/analyze/base/listener.go
@@ -43,6 +43,9 @@ type AnalyzeListener interface {
// ReceiveCloseConnection call this method when receive the connection close event
ReceiveCloseConnection(ctx *ConnectionContext, event *SocketCloseEvent)
+ // UpdateExtensionConfig for sampling
+ UpdateExtensionConfig(config *base.ExtensionConfig)
+
// PreFlushConnectionMetrics prepare to flush the connection metrics
PreFlushConnectionMetrics(ccs []*ConnectionWithBPF, bpfLoader *bpf.Loader) error
// FlushMetrics flush all metrics from connections
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go b/pkg/profiling/task/network/analyze/layer4/listener.go
index 2f32c25..162e35c 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -85,6 +85,9 @@ func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *ba
layer4.CloseExecuteTime = event.ExeTime
}
+func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+}
+
func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
// rebuild to the map for helping quick search correlate ConnectionContext
keyWithContext := make(map[string]*base.ConnectionContext)
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index 1b03572..e932f4d 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -20,6 +20,7 @@ package layer7
import (
"context"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
@@ -40,6 +41,16 @@ func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
})
}
+func (l *Listener) handleProfilingExtensionConfig(config *profiling.ExtensionConfig) {
+ if l.socketDataQueue == nil {
+ return
+ }
+ for _, p := range l.socketDataQueue.partitions {
+ ctx := p.ctx.(*SocketDataPartitionContext)
+ ctx.analyzer.UpdateExtensionConfig(config)
+ }
+}
+
type SocketDataPartitionContext struct {
analyzer *protocols.Analyzer
}
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index aa05f35..8d49b3e 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -99,6 +99,10 @@ func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *ba
l.cachedConnections.Set(l.generateCachedConnectionKey(ctx.ConnectionID, ctx.RandomID), ctx, ConnectionCachedTTL)
}
+func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+ l.handleProfilingExtensionConfig(config)
+}
+
func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
return nil
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 53494d1..6d54b3b 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -17,13 +17,17 @@
package base
-import "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+import (
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+)
type Protocol interface {
Name() string
GenerateMetrics() Metrics
ReceiveData(context Context, event *SocketDataUploadEvent) bool
+ UpdateExtensionConfig(config *profiling.ExtensionConfig)
}
type Context interface {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 740a24a..0df4242 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -31,6 +31,7 @@ import (
"sync"
"github.com/apache/skywalking-rover/pkg/logger"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
@@ -54,11 +55,11 @@ var DurationHistogramBuckets = []float64{
330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
}
-var SlowTraceTopNSize = 10
-
type Analyzer struct {
// cache connection metrics if the connect event not receive or process
cache map[string]*ConnectionMetrics
+
+ sampleConfig *SamplingConfig
}
type ConnectionMetrics struct {
@@ -133,6 +134,16 @@ func (h *Analyzer) ReceiveData(context protocol.Context, event *protocol.SocketD
return true
}
+func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+ if config == nil {
+ return
+ }
+ c := NewSamplingConfig(config.NetworkSamplings)
+ if c != nil {
+ h.sampleConfig = c
+ }
+}
+
func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
@@ -266,7 +277,7 @@ func (h *Analyzer) analyze(_ protocol.Context, connectionID string, connectionMe
data = connectionMetrics.serverMetrics
side = base.ConnectionRoleServer
}
- data.Append(request, requestBuffer, response, responseBuffer)
+ data.Append(h.sampleConfig, request, requestBuffer, response, responseBuffer)
if log.Enable(logrus.DebugLevel) {
metricsJSON, _ := json.Marshal(data)
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index cc3014f..06b1a56 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -32,6 +32,7 @@ import (
"golang.org/x/net/html/charset"
"github.com/apache/skywalking-rover/pkg/process/api"
+ task "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
@@ -62,7 +63,7 @@ type URIMetrics struct {
avgDuration *metrics.AvgCounter
durationHistogram *metrics.Histogram
- slowTraces *metrics.TopN
+ sampler *Sampler
}
func NewHTTP1URIMetrics() *URIMetrics {
@@ -75,11 +76,12 @@ func NewHTTP1URIMetrics() *URIMetrics {
RespPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
avgDuration: metrics.NewAvgCounter(),
durationHistogram: metrics.NewHistogram(DurationHistogramBuckets),
- slowTraces: metrics.NewTopN(SlowTraceTopNSize),
+ sampler: NewSampler(),
}
}
-func (u *URIMetrics) Append(req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
+func (u *URIMetrics) Append(sampleConfig *SamplingConfig,
+ req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
u.RequestCounter.Increase()
statusCounter := u.StatusCounter[resp.StatusCode]
if statusCounter == nil {
@@ -98,7 +100,7 @@ func (u *URIMetrics) Append(req *http.Request, reqBuffer protocol.SocketDataBuff
u.avgDuration.Increase(durationInMS)
u.durationHistogram.Increase(durationInMS)
- u.increaseSlowTraceTopN(u.slowTraces, duration, req, resp, reqBuffer, respBuffer)
+ u.sampler.AppendMetrics(sampleConfig, duration, req, resp, reqBuffer, respBuffer)
}
func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
@@ -127,7 +129,8 @@ func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
collections = u.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, u.RespPackageSizeHistogram)
metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
- logsCount := u.slowTraces.AppendData(local, traffic, metricsBuilder)
+
+ logsCount := u.sampler.BuildMetrics(local, traffic, metricsBuilder)
return len(collections) + logsCount
}
@@ -162,35 +165,13 @@ func (u *URIMetrics) MergeAndClean(other *URIMetrics) {
u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
u.avgDuration.MergeAndClean(other.avgDuration)
u.durationHistogram.MergeAndClean(other.durationHistogram)
- u.slowTraces.MergeAndClean(other.slowTraces)
+ u.sampler.MergeAndClean(other.sampler)
}
func (u *URIMetrics) String() string {
- return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, slow trace count: %d, response counters: %v",
+ return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, response counters: %v, sampler: %s",
u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
- u.avgDuration.Calculate(), u.slowTraces.List.Len(), u.StatusCounter)
-}
-
-func (u *URIMetrics) increaseSlowTraceTopN(slowTraceTopN *metrics.TopN, duration time.Duration,
- request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
- tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
- return request.Header.Get(key)
- })
- if err != nil {
- log.Warnf("analyze tracing context error: %v", err)
- return
- }
- if tracingContext == nil {
- return
- }
-
- // remove the query parameters
- uri := request.RequestURI
- if i := strings.Index(uri, "?"); i > 0 {
- uri = uri[0:i]
- }
- trace := &Trace{Trace: tracingContext, RequestURI: uri, RequestBuffer: reqBuffer, ResponseBuffer: respBuffer, Request: request, Response: response}
- slowTraceTopN.AddRecord(trace, duration.Milliseconds())
+ u.avgDuration.Calculate(), u.StatusCounter, u.sampler.String())
}
type Trace struct {
@@ -200,6 +181,8 @@ type Trace struct {
Request *http.Request
ResponseBuffer protocol.SocketDataBuffer
Response *http.Response
+ Type string
+ Settings *task.NetworkDataCollectingSettings
}
func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
@@ -218,21 +201,22 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
// body
logBody := &logv3.LogDataBody{Type: "json"}
- body := &SlowTraceLogBody{
+ body := &SamplingTraceLogBody{
Latency: duration,
TraceProvider: h.Trace.Provider().Name,
DetectPoint: traffic.Role.String(),
Component: traffic.Protocol.String(),
SSL: traffic.IsSSL,
URI: h.RequestURI,
- Reason: "slow",
+ Reason: h.Type,
+ Status: h.Response.StatusCode,
}
if traffic.Role == base.ConnectionRoleClient {
- body.ClientProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
- body.ServerProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+ body.ClientProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
+ body.ServerProcess = NewHTTP1SampledTraceLogRemoteProcess(traffic, process)
} else {
- body.ServerProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
- body.ClientProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+ body.ServerProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
+ body.ClientProcess = NewHTTP1SampledTraceLogRemoteProcess(traffic, process)
}
bodyJSON, err := json.Marshal(body)
if err != nil {
@@ -250,15 +234,21 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
events := make([]*v3.SpanAttachedEvent, 0)
- events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header, h.Request.Body, h.RequestBuffer)
- events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header, h.Response.Body, h.ResponseBuffer)
+ if h.Settings != nil && h.Settings.RequireCompleteRequest {
+ events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header,
+ h.Request.Body, h.RequestBuffer, h.Settings.MaxRequestSize)
+ }
+ if h.Settings != nil && h.Settings.RequireCompleteResponse {
+ events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header,
+ h.Response.Body, h.ResponseBuffer, h.Settings.MaxResponseSize)
+ }
metricsBuilder.AppendSpanAttachedEvents(events)
}
func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
- tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) []*v3.SpanAttachedEvent {
- content, err := h.transformHTTPRequest(header, body, buffer)
+ tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) []*v3.SpanAttachedEvent {
+ content, err := h.transformHTTPRequest(header, body, buffer, maxSize)
if err != nil {
log.Warnf("transform http %s erorr: %v", tp, err)
return events
@@ -299,7 +289,7 @@ func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.Proc
}
// nolint
-func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) (string, error) {
+func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) (string, error) {
var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == "gzip", true, true
contentType := header.Get("Content-Type")
if contentType != "" {
@@ -312,7 +302,11 @@ func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer
}
if !needGzip && isPlain && isUtf8 {
- return string(buffer.BufferData()), nil
+ resultSize := len(buffer.BufferData())
+ if maxSize > 0 && resultSize > int(maxSize) {
+ resultSize = int(maxSize)
+ }
+ return string(buffer.BufferData()[0:resultSize]), nil
}
// re-read the buffer and skip to the body position
@@ -324,12 +318,16 @@ func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer
defer response.Body.Close()
// no text plain, no need to print the data
- headerString := string(buffer.BufferData()[:len(buffer.BufferData())-buf.Buffered()])
+ headerLen := len(buffer.BufferData()) - buf.Buffered()
+ if maxSize > 0 && int(maxSize) < headerLen {
+ return string(buffer.BufferData()[:maxSize]), nil
+ }
+ headerString := string(buffer.BufferData()[:headerLen])
if !isPlain {
return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
}
- data := body
+ data := response.Body
if needGzip {
data, err = gzip.NewReader(response.Body)
if err != nil {
@@ -337,54 +335,76 @@ func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer
}
}
if !isUtf8 {
- data, err = charset.NewReader(data, contentType)
+ data, err = newCharsetReader(data, contentType)
if err != nil {
return "", err
}
}
realData, err := io.ReadAll(data)
+ if err != nil && err != io.ErrUnexpectedEOF {
+ return "", err
+ }
+ resultSize := len(realData)
+ if maxSize > 0 && (resultSize+headerLen) > int(maxSize) {
+ resultSize = int(maxSize) - headerLen
+ }
+ return fmt.Sprintf("%s%s", headerString, string(realData[0:resultSize])), nil
+}
+
+type charsetReadWrapper struct {
+ reader io.Reader
+}
+
+func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, error) {
+ reader, err := charset.NewReader(r, contentType)
if err != nil {
- if err != io.ErrUnexpectedEOF {
- return "", err
- }
- realData = append(realData, []byte("[chunked]")...)
+ return nil, err
}
- return fmt.Sprintf("%s%s", headerString, string(realData)), nil
+ return &charsetReadWrapper{reader: reader}, nil
+}
+
+func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
+ return c.reader.Read(p)
+}
+
+func (c *charsetReadWrapper) Close() error {
+ return nil
}
-type SlowTraceLogBody struct {
- URI string `json:"uri"`
- Reason string `json:"reason"`
- Latency int64 `json:"latency"`
- TraceProvider string `json:"trace_provider"`
- ClientProcess *SlowTraceLogProcess `json:"client_process"`
- ServerProcess *SlowTraceLogProcess `json:"server_process"`
- DetectPoint string `json:"detect_point"`
- Component string `json:"component"`
- SSL bool `json:"ssl"`
+type SamplingTraceLogBody struct {
+ URI string `json:"uri"`
+ Reason string `json:"reason"`
+ Latency int64 `json:"latency"`
+ TraceProvider string `json:"trace_provider"`
+ ClientProcess *SamplingTraceLogProcess `json:"client_process"`
+ ServerProcess *SamplingTraceLogProcess `json:"server_process"`
+ DetectPoint string `json:"detect_point"`
+ Component string `json:"component"`
+ SSL bool `json:"ssl"`
+ Status int `json:"status"`
}
-type SlowTraceLogProcess struct {
+type SamplingTraceLogProcess struct {
ProcessID string `json:"process_id"`
Local bool `json:"local"`
Address string `json:"address"`
}
-func NewHTTP1SlowTRaceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *SlowTraceLogProcess {
+func NewHTTP1SampledTraceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *SamplingTraceLogProcess {
if len(traffic.RemoteProcesses) != 0 {
for _, p := range traffic.RemoteProcesses {
// only match with same service instance
if local.Entity().ServiceName == p.Entity().ServiceName &&
local.Entity().InstanceName == p.Entity().InstanceName {
- return &SlowTraceLogProcess{ProcessID: p.ID()}
+ return &SamplingTraceLogProcess{ProcessID: p.ID()}
}
}
}
if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
- return &SlowTraceLogProcess{Local: true}
+ return &SamplingTraceLogProcess{Local: true}
}
- return &SlowTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
+ return &SamplingTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
new file mode 100644
index 0000000..830b271
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
@@ -0,0 +1,199 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http1
+
+import (
+ "fmt"
+ "net/http"
+ "regexp"
+ "strings"
+ "time"
+
+ lru "github.com/hashicorp/golang-lru"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
+)
+
+const (
+ TopNSize = 10
+ SamplingRuleCacheSize = 200
+)
+
+type Sampler struct {
+ Error4xxTraces *metrics.TopN
+ Error5xxTraces *metrics.TopN
+ SlowTraces *metrics.TopN
+}
+
+func NewSampler() *Sampler {
+ return &Sampler{
+ Error4xxTraces: metrics.NewTopN(TopNSize),
+ Error5xxTraces: metrics.NewTopN(TopNSize),
+ SlowTraces: metrics.NewTopN(TopNSize),
+ }
+}
+
+func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
+ request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
+ if config == nil {
+ return
+ }
+ tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
+ return request.Header.Get(key)
+ })
+ if err != nil {
+ log.Warnf("analyze tracing context error: %v", err)
+ return
+ }
+ if tracingContext == nil {
+ return
+ }
+
+ uri := request.RequestURI
+ // remove the query parameters
+ if i := strings.Index(uri, "?"); i > 0 {
+ uri = uri[0:i]
+ }
+
+ // find out with url rule is match
+ rule := config.findMatchesRule(uri)
+ if rule == nil {
+ return
+ }
+
+ // if smaller than minimal duration, then ignore
+ if int64(rule.MinDuration) > duration.Milliseconds() {
+ return
+ }
+
+ var traceType string
+ var topN *metrics.TopN
+ if rule.When5XX && response.StatusCode >= 500 && response.StatusCode < 600 {
+ traceType = "status_5xx"
+ topN = s.Error5xxTraces
+ } else if rule.When4XX && response.StatusCode >= 400 && response.StatusCode < 500 {
+ traceType = "status_4xx"
+ topN = s.Error4xxTraces
+ } else {
+ traceType = "slow"
+ topN = s.SlowTraces
+ }
+
+ trace := &Trace{
+ Trace: tracingContext,
+ RequestURI: uri,
+ RequestBuffer: reqBuffer,
+ ResponseBuffer: respBuffer,
+ Request: request,
+ Response: response,
+ Type: traceType,
+ Settings: rule.Settings,
+ }
+ topN.AddRecord(trace, duration.Milliseconds())
+}
+
+func (s *Sampler) BuildMetrics(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) int {
+ var count int
+ count += s.SlowTraces.AppendData(process, traffic, metricsBuilder)
+ count += s.Error4xxTraces.AppendData(process, traffic, metricsBuilder)
+ count += s.Error5xxTraces.AppendData(process, traffic, metricsBuilder)
+ return count
+}
+
+func (s *Sampler) MergeAndClean(other *Sampler) {
+ s.SlowTraces.MergeAndClean(other.SlowTraces)
+ s.Error4xxTraces.MergeAndClean(other.Error4xxTraces)
+ s.Error5xxTraces.MergeAndClean(other.Error5xxTraces)
+}
+
+func (s *Sampler) String() string {
+ return fmt.Sprintf("slow trace count: %d, 4xx error count: %d, 5xx error count: %d",
+ s.SlowTraces.List.Len(), s.Error4xxTraces.List.Len(), s.Error5xxTraces.List.Len())
+}
+
+type SamplingConfig struct {
+ DefaultRule *profiling.NetworkSamplingRule
+ URISamplings []*URISampling
+ uriRuleCache *lru.Cache
+}
+
+type URISampling struct {
+ URIMatcher *regexp.Regexp
+ Rule *profiling.NetworkSamplingRule
+}
+
+func NewSamplingConfig(configs []*profiling.NetworkSamplingRule) *SamplingConfig {
+ if len(configs) == 0 {
+ return nil
+ }
+ cache, err := lru.New(SamplingRuleCacheSize)
+ if err != nil {
+ log.Warnf("creating sampling cache config failure: %v", err)
+ }
+ result := &SamplingConfig{
+ uriRuleCache: cache,
+ }
+ for _, c := range configs {
+ if c.URIRegex == nil {
+ if result.DefaultRule != nil {
+ log.Warnf("the default rule is already exists, so ignore it")
+ continue
+ }
+ result.DefaultRule = c
+ continue
+ }
+
+ uriPattern, err := regexp.Compile(*c.URIRegex)
+ if err != nil {
+ log.Warnf("parsing URI pattern failure, ignore this sampling config: %v", err)
+ continue
+ }
+
+ result.URISamplings = append(result.URISamplings, &URISampling{
+ URIMatcher: uriPattern,
+ Rule: c,
+ })
+ }
+ return result
+}
+
+func (s *SamplingConfig) findMatchesRule(uri string) *profiling.NetworkSamplingRule {
+ // if cached then return
+ if len(s.URISamplings) == 0 {
+ return s.DefaultRule
+ }
+
+ value, ok := s.uriRuleCache.Get(uri)
+ if ok {
+ return value.(*profiling.NetworkSamplingRule)
+ }
+
+ result := s.DefaultRule
+ for _, rule := range s.URISamplings {
+ if !rule.URIMatcher.MatchString(uri) {
+ continue
+ }
+ result = rule.Rule
+ s.uriRuleCache.Add(uri, rule)
+ }
+ return result
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 22735fc..743ea30 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -19,6 +19,7 @@ package protocols
import (
"github.com/apache/skywalking-rover/pkg/logger"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
@@ -66,6 +67,12 @@ func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent)
event.GenerateConnectionID(), event.Protocol.String(), event.Protocol, event.MsgType.String())
}
+func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+ for _, p := range a.protocols {
+ p.UpdateExtensionConfig(config)
+ }
+}
+
type ProtocolMetrics struct {
data map[string]protocol.Metrics
}
diff --git a/pkg/profiling/task/network/delegate.go b/pkg/profiling/task/network/delegate.go
index ef0bf8f..eb97260 100644
--- a/pkg/profiling/task/network/delegate.go
+++ b/pkg/profiling/task/network/delegate.go
@@ -37,6 +37,7 @@ var realRunner = NewGlobalRunnerContext()
type DelegateRunner struct {
base *base.Runner
+ task *base.ProfilingTask
processes []api.ProcessInterface
ctx context.Context
@@ -57,12 +58,13 @@ func (r *DelegateRunner) Init(task *base.ProfilingTask, processes []api.ProcessI
return fmt.Errorf("please provide one process at least")
}
r.processes = processes
+ r.task = task
return nil
}
func (r *DelegateRunner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNotify) error {
r.ctx, r.cancel = context.WithCancel(ctx)
- if err := realRunner.Start(ctx, r.processes); err != nil {
+ if err := realRunner.Start(ctx, r.task, r.processes); err != nil {
return err
}
notify()
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index 6b8063d..bdaf369 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -114,12 +114,12 @@ func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, error)
return len(r.processes) == 0, err
}
-func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) error {
+func (r *Runner) Start(ctx context.Context, task *base.ProfilingTask, processes []api.ProcessInterface) error {
r.startLock.Lock()
defer r.startLock.Unlock()
// if already start, then just adding the processes
if r.bpf != nil {
- return r.addProcesses(processes)
+ return r.updateTask(task, processes)
}
r.ctx, r.cancel = context.WithCancel(ctx)
@@ -130,7 +130,7 @@ func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) er
}
r.bpf = bpfLoader
- if err := r.addProcesses(processes); err != nil {
+ if err := r.updateTask(task, processes); err != nil {
return err
}
@@ -356,8 +356,9 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error
return nil
}
-func (r *Runner) addProcesses(processes []api.ProcessInterface) error {
+func (r *Runner) updateTask(task *base.ProfilingTask, processes []api.ProcessInterface) error {
var err error
+ r.analyzeContext.UpdateExtensionConfig(task.ExtensionConfig)
for _, p := range processes {
pid := p.Pid()
alreadyExists := false
diff --git a/test/e2e/base/env b/test/e2e/base/env
index dab2c61..6283ee3 100644
--- a/test/e2e/base/env
+++ b/test/e2e/base/env
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_CTL_COMMIT=651d196ee9345b164cc35fc1dbdcaf058920226a
-SW_OAP_COMMIT=a386853bc9ef6221c8d6d1688b607e1d230f5ec4
+SW_CTL_COMMIT=0883266bfaa36612927b69e35781b64ea181758d
+SW_OAP_COMMIT=bcd9f7a56b99ca612711ed3a540bdae5f46e9171
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
SW_AGENT_GO_COMMIT=216f122d942cb683f48578d3014cc5ea83637582
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/base-cases.yaml b/test/e2e/cases/profiling/network/base-cases.yaml
index 5e59008..c3d377d 100644
--- a/test/e2e/cases/profiling/network/base-cases.yaml
+++ b/test/e2e/cases/profiling/network/base-cases.yaml
@@ -24,7 +24,7 @@ cases:
expected: expected/process.yml
# create network profiling task
- - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profiling ebpf create network --service-name=service --instance-name=test
+ - query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql profiling ebpf create network --service-name=service --instance-name=test --sampling-config=test/e2e/cases/profiling/network/sampling.yaml
expected: expected/profiling-create.yml
- query: swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql dep process --service-name service --instance-name=test
expected: expected/dependency-processs.yml
diff --git a/test/e2e/cases/profiling/network/envoy/e2e.yaml b/test/e2e/cases/profiling/network/envoy/e2e.yaml
index 47020d4..5cc4273 100644
--- a/test/e2e/cases/profiling/network/envoy/e2e.yaml
+++ b/test/e2e/cases/profiling/network/envoy/e2e.yaml
@@ -44,6 +44,7 @@ setup:
command: |
rm -rf skywalking-kubernetes && git clone https://github.com/apache/skywalking-kubernetes.git
cd skywalking-kubernetes
+ git reset --hard a7d8ff959b46e70cb10428c6714013563ae38cbc
cd chart
mkdir -p skywalking/files/conf.d/oap/meter-analyzer-config/ && cp ../../test/e2e/cases/profiling/network/envoy/network-profiling.yaml skywalking/files/conf.d/oap/meter-analyzer-config/network-profiling.yaml
helm dep up skywalking
diff --git a/test/e2e/base/env b/test/e2e/cases/profiling/network/expected/status-4xx-traces.yml
similarity index 76%
copy from test/e2e/base/env
copy to test/e2e/cases/profiling/network/expected/status-4xx-traces.yml
index dab2c61..3c942e0 100644
--- a/test/e2e/base/env
+++ b/test/e2e/cases/profiling/network/expected/status-4xx-traces.yml
@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_CTL_COMMIT=651d196ee9345b164cc35fc1dbdcaf058920226a
-SW_OAP_COMMIT=a386853bc9ef6221c8d6d1688b607e1d230f5ec4
-SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
-
-SW_AGENT_GO_COMMIT=216f122d942cb683f48578d3014cc5ea83637582
\ No newline at end of file
+{{- contains . }}
+- name: skywalking-/notfound
+ id: {{ notEmpty .id }}
+ value: {{ notEmpty .value }}
+ refid: {{ notEmpty .refid }}
+{{- end }}
\ No newline at end of file
diff --git a/test/e2e/base/env b/test/e2e/cases/profiling/network/expected/status-5xx-traces.yml
similarity index 76%
copy from test/e2e/base/env
copy to test/e2e/cases/profiling/network/expected/status-5xx-traces.yml
index dab2c61..38e2391 100644
--- a/test/e2e/base/env
+++ b/test/e2e/cases/profiling/network/expected/status-5xx-traces.yml
@@ -13,8 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_CTL_COMMIT=651d196ee9345b164cc35fc1dbdcaf058920226a
-SW_OAP_COMMIT=a386853bc9ef6221c8d6d1688b607e1d230f5ec4
-SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
-
-SW_AGENT_GO_COMMIT=216f122d942cb683f48578d3014cc5ea83637582
\ No newline at end of file
+{{- contains . }}
+- name: skywalking-/provider
+ id: {{ notEmpty .id }}
+ value: {{ notEmpty .value }}
+ refid: {{ notEmpty .refid }}
+{{- end }}
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/golang/e2e.yaml b/test/e2e/cases/profiling/network/golang/e2e.yaml
index aa4807b..1ce5316 100644
--- a/test/e2e/cases/profiling/network/golang/e2e.yaml
+++ b/test/e2e/cases/profiling/network/golang/e2e.yaml
@@ -44,4 +44,4 @@ verify:
- includes:
- ../base-cases.yaml
- ../http1-metrics-cases.yaml
- - ../http1-slow-traces-cases.yaml
\ No newline at end of file
+ - ../http1-sampled-traces-cases.yaml
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/golang/service.go b/test/e2e/cases/profiling/network/golang/service.go
index 6e895ac..3696941 100644
--- a/test/e2e/cases/profiling/network/golang/service.go
+++ b/test/e2e/cases/profiling/network/golang/service.go
@@ -36,13 +36,24 @@ var skyWalkingTracer *go2sky.Tracer
var zipkinTracer *zipkin.Tracer
func provider(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Content-Type", "text/plain")
time.Sleep(time.Second * 1)
+ if req.URL.Query().Get("error") == "true" {
+ w.WriteHeader(500)
+ return
+ }
+ w.Header().Set("Content-Type", "text/plain")
_, _ = w.Write([]byte("service provider\n"))
}
func consumer(w http.ResponseWriter, req *http.Request) {
+ typeData := req.URL.Query().Get("type")
addr := "https://proxy/provider"
+ if typeData == "notfound" {
+ addr = "https://proxy/notfound"
+ } else if typeData == "error" {
+ addr = "https://proxy/provider?error=true"
+ }
+
request, err := http.NewRequest("GET", addr, nil)
exitSpan, err := skyWalkingTracer.CreateExitSpan(req.Context(), "/provider", addr, func(headerKey, headerValue string) error {
request.Header.Set(headerKey, headerValue)
diff --git a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml b/test/e2e/cases/profiling/network/http1-sampled-traces-cases.yaml
similarity index 62%
rename from test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
rename to test/e2e/cases/profiling/network/http1-sampled-traces-cases.yaml
index 6f0e7ca..49d5546 100644
--- a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
+++ b/test/e2e/cases/profiling/network/http1-sampled-traces-cases.yaml
@@ -15,6 +15,7 @@
# HTTP1 verify
cases:
+ # slow traces
- query: |
curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
sleep 5;
@@ -27,8 +28,8 @@ cases:
curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
sleep 3;
traceid=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
- --name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
- --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
+ --name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
+ --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
curl http://${oap_host}:${oap_9412}/zipkin/api/v2/trace/${traceid} | yq e -| yq e 'del(.[].tags)' -
expected: expected/zipkin-trace.yml
# skywalking trace
@@ -37,4 +38,19 @@ cases:
--name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
--dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "skywalking-/provider")).[0].id' -);
swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $traceid
- expected: expected/skywalking-trace.yml
\ No newline at end of file
+ expected: expected/skywalking-trace.yml
+ # not founds
+ - query: |
+ curl -s https://${service_host}:${service_10443}/consumer?type=notfound > /dev/null;
+ sleep 5;
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
+ --name=sampled_status_4xx_trace_record --service-name service --instance-name test --process-name service \
+ --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 20
+ expected: expected/status-4xx-traces.yml
+ - query: |
+ curl -s https://${service_host}:${service_10443}/consumer?type=error > /dev/null;
+ sleep 5;
+ swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
+ --name=sampled_status_5xx_trace_record --service-name service --instance-name test --process-name service \
+ --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 20
+ expected: expected/status-5xx-traces.yml
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/sampling.yaml b/test/e2e/cases/profiling/network/sampling.yaml
new file mode 100644
index 0000000..313c42b
--- /dev/null
+++ b/test/e2e/cases/profiling/network/sampling.yaml
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The command `dashboard global` supports displaying three kinds of data:
+# `global metrics`, `global response latency`, `Global heat map`.
+# If you don't want to display an item, you can just delete or comment its whole configuration below.
+# Generally, there is no need to modify properties unless there is a explanatory comment.
+
+
+samplings:
+ - when_4xx: true
+ when_5xx: true
+ setting:
+ require_request: true
+ require_response: true
\ No newline at end of file