You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by li...@apache.org on 2022/11/17 12:04:07 UTC
[skywalking-rover] branch main updated: Support upload the full HTTP content to span attached event (#58)
This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 026b294 Support upload the full HTTP content to span attached event (#58)
026b294 is described below
commit 026b294653c85d73d806617b45f3f7f2221049ef
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Nov 17 20:04:01 2022 +0800
Support upload the full HTTP content to span attached event (#58)
---
bpf/profiling/network/netmonitor.c | 10 +-
bpf/profiling/network/sock_stats.h | 3 +-
docs/en/setup/configuration/profiling.md | 8 +-
go.mod | 10 +-
go.sum | 20 +-
pkg/profiling/task/network/analyze/base/metrics.go | 16 +-
.../task/network/analyze/layer4/metrics.go | 4 +-
.../task/network/analyze/layer7/events.go | 9 +-
.../task/network/analyze/layer7/listener.go | 10 +-
.../analyze/layer7/protocols/{ => base}/events.go | 26 +-
.../analyze/layer7/protocols/base/protocol.go} | 31 +-
.../analyze/layer7/protocols/{ => base}/tracing.go | 57 +-
.../task/network/analyze/layer7/protocols/http1.go | 598 ---------------------
.../analyze/layer7/protocols/http1/analyzer.go | 364 +++++++++++++
.../{http1_test.go => http1/analyzer_test.go} | 12 +-
.../{http1_builder.go => http1/builder.go} | 60 ++-
.../analyze/layer7/protocols/http1/metrics.go | 390 ++++++++++++++
.../analyze/layer7/protocols/metrics/counter.go | 10 +-
.../network/analyze/layer7/protocols/protocols.go | 60 +--
pkg/profiling/task/network/runner.go | 39 +-
pkg/tools/host/time.go | 11 +
test/e2e/base/env | 4 +-
.../network/expected/skywalking-trace.yml | 55 +-
.../profiling/network/http1-slow-traces-cases.yaml | 8 +-
24 files changed, 1052 insertions(+), 763 deletions(-)
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 3d82888..1a6f922 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -354,7 +354,7 @@ static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov,
UPLOAD_PER_SOCKET_DATA_IOV();
}
-static __inline void upload_socket_data(void *ctx, __u64 timestamp, __u64 conid, struct active_connection_t *connection, struct sock_data_args_t *args, ssize_t bytes_count, __u32 existing_msg_type, __u32 data_direction, bool ssl) {
+static __inline void upload_socket_data(void *ctx, __u64 start_time, __u64 end_time, __u64 conid, struct active_connection_t *connection, struct sock_data_args_t *args, ssize_t bytes_count, __u32 existing_msg_type, __u32 data_direction, bool ssl) {
// generate event
__u32 kZero = 0;
struct socket_data_upload_event *event = bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
@@ -382,7 +382,8 @@ static __inline void upload_socket_data(void *ctx, __u64 timestamp, __u64 conid,
}
// basic data
- event->timestamp = timestamp;
+ event->start_time = start_time;
+ event->end_time = end_time;
event->protocol = connection->protocol;
event->msg_type = existing_msg_type;
event->direction = data_direction;
@@ -463,7 +464,7 @@ static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, st
}
// upload the socket data if need
- upload_socket_data(ctx, curr_nacs, conid, conn, args, bytes_count, msg_type, data_direction, ssl);
+ upload_socket_data(ctx, args->start_nacs, curr_nacs, conid, conn, args, bytes_count, msg_type, data_direction, ssl);
// add statics when is not ssl(native buffer)
if (ssl == false) {
@@ -971,9 +972,12 @@ SEC("kprobe/recv")
int sys_recv(struct pt_regs* ctx) {
ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
__u64 id = bpf_get_current_pid_tgid();
+ char* buf;
+ bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
struct sock_data_args_t data_args = {};
data_args.fd = _(PT_REGS_PARM1(ctx));
+ data_args.buf = buf;
data_args.start_nacs = bpf_ktime_get_ns();
data_args.data_id = generate_socket_data_id(id, data_args.fd, SOCKET_OPTS_TYPE_RECV);
bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index f31e859..8569036 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -263,7 +263,8 @@ struct socket_data_upload_event {
__u8 finished;
__u16 sequence;
__u16 data_len;
- __u64 timestamp;
+ __u64 start_time;
+ __u64 end_time;
__u64 conid;
__u64 randomid;
__u64 data_id;
diff --git a/docs/en/setup/configuration/profiling.md b/docs/en/setup/configuration/profiling.md
index 5811306..7f2c35d 100644
--- a/docs/en/setup/configuration/profiling.md
+++ b/docs/en/setup/configuration/profiling.md
@@ -106,4 +106,10 @@ Based on the above two data types, the following metrics are provided.
| Name | Type | Unit | Description |
|--------------|-------|-------------|----------------------------|
-| slow_traces | TopN | millisecond | The Top N slow trace(id)s |
\ No newline at end of file
+| slow_traces | TopN | millisecond | The Top N slow trace(id)s |
+
+##### Span Attached Event
+| Name | Description |
+|--------------------|-----------------------------------------------------------------------------------------------|
+| http-full-request | Complete information about the HTTP request, it's only reported when it matches slow traces. |
+| http-full-response | Complete information about the HTTP response, it's only reported when it matches slow traces. |
diff --git a/go.mod b/go.mod
index 0e8c2de..82974a6 100644
--- a/go.mod
+++ b/go.mod
@@ -16,13 +16,14 @@ require (
github.com/spf13/viper v1.10.1
github.com/zekroTJA/timedmap v1.4.0
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
- golang.org/x/sys v0.0.0-20211210111614-af8b64212486
+ golang.org/x/net v0.0.0-20220722155237-a158d28d115b
+ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab
google.golang.org/grpc v1.44.0
k8s.io/api v0.23.5
k8s.io/apimachinery v0.23.5
k8s.io/client-go v0.23.5
k8s.io/utils v0.0.0-20211116205334-6203023598ed
- skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce
+ skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187
)
require (
@@ -52,14 +53,13 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
- golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
- golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
+ golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
- google.golang.org/protobuf v1.28.0 // indirect
+ google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 11cb4b5..5d1bff8 100644
--- a/go.sum
+++ b/go.sum
@@ -550,9 +550,9 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -654,14 +654,16 @@ golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -887,8 +889,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -950,5 +952,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce h1:3wCiFWEEREdxe0T/PJM1S6DDqDsKETYVNtfaJ4zWmdQ=
-skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce/go.mod h1:uWwwvhcwe2MD/nJCg0c1EE/eL6KzaBosLHDfMFoEJ30=
+skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187 h1:6JgAg9aohcHd72VplZUGycZgCNo6iQrz735nmtOTCnE=
+skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187/go.mod h1:lxmYWY1uAP5SLVKNymAyDzn7KG6dhPWN+pYHmyt+0vo=
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go b/pkg/profiling/task/network/analyze/base/metrics.go
index 15a1438..21acb29 100644
--- a/pkg/profiling/task/network/analyze/base/metrics.go
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -31,7 +31,7 @@ import (
// ConnectionMetrics The Metrics in each listener
type ConnectionMetrics interface {
// MergeMetricsFromConnection merge the metrics from connection, and added into self
- MergeMetricsFromConnection(connection *ConnectionContext)
+ MergeMetricsFromConnection(connection *ConnectionContext, data ConnectionMetrics)
}
type ConnectionMetricsContext struct {
@@ -51,8 +51,9 @@ func (c *ConnectionMetricsContext) GetMetrics(listenerName string) ConnectionMet
}
func (c *ConnectionMetricsContext) MergeMetricsFromConnection(connection *ConnectionContext) {
- for _, metric := range c.data {
- metric.MergeMetricsFromConnection(connection)
+ for name, metric := range c.data {
+ metrics := connection.Metrics.GetMetrics(name)
+ metric.MergeMetricsFromConnection(connection, metrics)
}
}
@@ -60,6 +61,7 @@ type MetricsBuilder struct {
prefix string
metrics map[metadata][]*agentv3.MeterData
logs map[metadata][]*logv3.LogData
+ events []*agentv3.SpanAttachedEvent
}
func NewMetricsBuilder(prefix string) *MetricsBuilder {
@@ -85,6 +87,10 @@ func (m *MetricsBuilder) AppendLogs(service string, log *logv3.LogData) {
m.logs[meta] = append(m.logs[meta], log)
}
+func (m *MetricsBuilder) AppendSpanAttachedEvents(events []*agentv3.SpanAttachedEvent) {
+ m.events = append(m.events, events...)
+}
+
func (m *MetricsBuilder) MetricPrefix() string {
return m.prefix
}
@@ -141,6 +147,10 @@ func (m *MetricsBuilder) BuildLogs() [][]*logv3.LogData {
return result
}
+func (m *MetricsBuilder) BuildEvents() []*agentv3.SpanAttachedEvent {
+ return m.events
+}
+
type metadata struct {
Layer string
ServiceName string
diff --git a/pkg/profiling/task/network/analyze/layer4/metrics.go b/pkg/profiling/task/network/analyze/layer4/metrics.go
index b8bb89a..2a33aec 100644
--- a/pkg/profiling/task/network/analyze/layer4/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer4/metrics.go
@@ -64,8 +64,8 @@ func NewLayer4Metrics() *Metrics {
}
}
-func (l *Metrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
- metrics := connection.Metrics.GetMetrics(Name).(*Metrics)
+func (l *Metrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+ metrics := data.(*Metrics)
l.WriteCounter.IncreaseToCurrent(metrics.WriteCounter.CalculateIncrease())
l.ReadCounter.IncreaseToCurrent(metrics.ReadCounter.CalculateIncrease())
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index a15b734..1b03572 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -21,6 +21,7 @@ import (
"context"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
)
@@ -33,9 +34,9 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int) {
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
l.socketDataQueue.Start(ctx, bpfLoader, bpfLoader.SocketDataUploadEventQueue, 1, l.protocolPerCPUBuffer,
func() interface{} {
- return &protocols.SocketDataUploadEvent{}
+ return &base.SocketDataUploadEvent{}
}, func(data interface{}) string {
- return data.(*protocols.SocketDataUploadEvent).GenerateConnectionID()
+ return data.(*base.SocketDataUploadEvent).GenerateConnectionID()
})
}
@@ -43,13 +44,13 @@ type SocketDataPartitionContext struct {
analyzer *protocols.Analyzer
}
-func NewSocketDataPartitionContext(l protocols.Context) *SocketDataPartitionContext {
+func NewSocketDataPartitionContext(l base.Context) *SocketDataPartitionContext {
return &SocketDataPartitionContext{
analyzer: protocols.NewAnalyzer(l),
}
}
func (p *SocketDataPartitionContext) Consume(data interface{}) {
- event := data.(*protocols.SocketDataUploadEvent)
+ event := data.(*base.SocketDataUploadEvent)
p.analyzer.ReceiveSocketDataEvent(event)
}
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index facf4a5..aa05f35 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -29,11 +29,14 @@ import (
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
"github.com/zekroTJA/timedmap"
)
+var ListenerName = "layer7"
+
const (
ConnectionCachedTTL = time.Minute
ConnectionCacheFlushTime = time.Second * 20
@@ -56,7 +59,7 @@ func NewListener(analyzer *base.AnalyzerContext) *Listener {
}
func (l *Listener) Name() string {
- return protocols.ListenerName
+ return ListenerName
}
func (l *Listener) Init(config *profiling.TaskConfig, moduleManager *module.Manager) error {
@@ -125,6 +128,11 @@ func (l *Listener) QueryConnection(conID, randomID uint64) *base.ConnectionConte
return nil
}
+func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) protocol.Metrics {
+ metrics := conMetrics.GetMetrics(ListenerName).(*protocols.ProtocolMetrics)
+ return metrics.GetProtocolMetrics(protocolName)
+}
+
func (l *Listener) generateCachedConnectionKey(conID, randomID uint64) string {
return fmt.Sprintf("%d_%d", conID, randomID)
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
similarity index 90%
rename from pkg/profiling/task/network/analyze/layer7/protocols/events.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index de0b534..e3f09f0 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package protocols
+package base
import (
"fmt"
@@ -30,12 +30,13 @@ type SocketDataBuffer interface {
BufferData() []byte
// TotalSize of socket data, the data may exceed the size of the BufferData()
TotalSize() uint64
- // Time of socket data send
- Time() uint64
// Direction of the data, send or receive
Direction() base.SocketDataDirection
FirstEvent() *SocketDataUploadEvent
+ StartTime() uint64
+ EndTime() uint64
+
MinDataID() int
MaxDataID() int
}
@@ -47,7 +48,8 @@ type SocketDataUploadEvent struct {
Finished uint8
Sequence uint16
DataLen uint16
- Timestamp uint64
+ StartTime0 uint64
+ EndTime0 uint64
ConnectionID uint64
RandomID uint64
DataID uint64
@@ -63,8 +65,12 @@ func (s *SocketDataUploadEvent) BufferData() []byte {
return s.Buffer[:s.DataLen]
}
-func (s *SocketDataUploadEvent) Time() uint64 {
- return s.Timestamp
+func (s *SocketDataUploadEvent) StartTime() uint64 {
+ return s.StartTime0
+}
+
+func (s *SocketDataUploadEvent) EndTime() uint64 {
+ return s.EndTime0
}
func (s *SocketDataUploadEvent) Direction() base.SocketDataDirection {
@@ -124,8 +130,12 @@ func (s *SocketDataUploadCombinedEvent) TotalSize() uint64 {
return s.first.TotalSize0
}
-func (s *SocketDataUploadCombinedEvent) Time() uint64 {
- return s.first.Timestamp
+func (s *SocketDataUploadCombinedEvent) StartTime() uint64 {
+ return s.first.StartTime0
+}
+
+func (s *SocketDataUploadCombinedEvent) EndTime() uint64 {
+ return s.first.EndTime0
}
func (s *SocketDataUploadCombinedEvent) MinDataID() int {
diff --git a/pkg/tools/host/time.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
similarity index 56%
copy from pkg/tools/host/time.go
copy to pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 1d03bcf..53494d1 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -15,22 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package host
+package base
-import (
- "fmt"
- "time"
+import "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
- "github.com/shirou/gopsutil/host"
-)
+type Protocol interface {
+ Name() string
+ GenerateMetrics() Metrics
-// BootTime the System boot time
-var BootTime time.Time
+ ReceiveData(context Context, event *SocketDataUploadEvent) bool
+}
+
+type Context interface {
+ QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
+ QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) Metrics
+}
+
+type Metrics interface {
+ base.ConnectionMetrics
-func init() {
- boot, err := host.BootTime()
- if err != nil {
- panic(fmt.Errorf("init boot time error: %v", err))
- }
- BootTime = time.Unix(int64(boot), 0)
+ // FlushMetrics flush all metrics from traffic to the metricsBuilder
+ FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder)
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/tracing.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
similarity index 72%
rename from pkg/profiling/task/network/analyze/layer7/protocols/tracing.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
index 2027674..2ef3188 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/tracing.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
@@ -15,23 +15,32 @@
// specific language governing permissions and limitations
// under the License.
-package protocols
+package base
import (
"encoding/base64"
"fmt"
"strings"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
type TracingContext interface {
TraceID() string
- Provider() string
+ TraceSegmentID() string
+ SpanID() string
+ Provider() *TraceContextProvider
+}
+
+type TraceContextProvider struct {
+ Type v3.SpanAttachedEvent_SpanReferenceType
+ Name string
}
type SkyWalkingTracingContext struct {
TraceID0 string
- SegmentID string
- SpanID string
+ SegmentID0 string
+ SpanID0 string
ParentService string
ParentServiceInstance string
ParentEndpoint string
@@ -40,15 +49,26 @@ type SkyWalkingTracingContext struct {
type ZipkinTracingContext struct {
TraceID0 string
- SpanID string
+ SpanID0 string
}
func (w *SkyWalkingTracingContext) TraceID() string {
return w.TraceID0
}
-func (w *SkyWalkingTracingContext) Provider() string {
- return "skywalking"
+func (w *SkyWalkingTracingContext) TraceSegmentID() string {
+ return w.SegmentID0
+}
+
+func (w *SkyWalkingTracingContext) SpanID() string {
+ return w.SpanID0
+}
+
+func (w *SkyWalkingTracingContext) Provider() *TraceContextProvider {
+ return &TraceContextProvider{
+ Type: v3.SpanAttachedEvent_SKYWALKING,
+ Name: "skywalking",
+ }
}
func AnalyzeTracingContext(fetcher func(key string) string) (TracingContext, error) {
@@ -79,8 +99,8 @@ func analyzeSkyWalking8TracingContext(val string) (*SkyWalkingTracingContext, er
var err error
ctx := &SkyWalkingTracingContext{}
ctx.TraceID0, err = decodeBase64StringValue(err, parts[1])
- ctx.SegmentID, err = decodeBase64StringValue(err, parts[2])
- ctx.SpanID = parts[3]
+ ctx.SegmentID0, err = decodeBase64StringValue(err, parts[2])
+ ctx.SpanID0 = parts[3]
ctx.ParentService, err = decodeBase64StringValue(err, parts[4])
ctx.ParentServiceInstance, err = decodeBase64StringValue(err, parts[5])
ctx.ParentEndpoint, err = decodeBase64StringValue(err, parts[6])
@@ -93,7 +113,7 @@ func analyzeSkyWalking8TracingContext(val string) (*SkyWalkingTracingContext, er
}
func analyzeZipkinTracingContextWithSpecificData(traceID, spanID string) *ZipkinTracingContext {
- return &ZipkinTracingContext{TraceID0: traceID, SpanID: spanID}
+ return &ZipkinTracingContext{TraceID0: traceID, SpanID0: spanID}
}
func analyzeZipkinTracingContextWithSingleData(singleData string) *ZipkinTracingContext {
@@ -101,15 +121,26 @@ func analyzeZipkinTracingContextWithSingleData(singleData string) *ZipkinTracing
if len(info) < 2 {
return nil
}
- return &ZipkinTracingContext{TraceID0: info[0], SpanID: info[1]}
+ return &ZipkinTracingContext{TraceID0: info[0], SpanID0: info[1]}
}
func (w *ZipkinTracingContext) TraceID() string {
return w.TraceID0
}
-func (w *ZipkinTracingContext) Provider() string {
- return "zipkin"
+func (w *ZipkinTracingContext) TraceSegmentID() string {
+ return ""
+}
+
+func (w *ZipkinTracingContext) SpanID() string {
+ return w.SpanID0
+}
+
+func (w *ZipkinTracingContext) Provider() *TraceContextProvider {
+ return &TraceContextProvider{
+ Type: v3.SpanAttachedEvent_ZIPKIN,
+ Name: "zipkin",
+ }
}
func decodeBase64StringValue(err error, val string) (string, error) {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1.go
deleted file mode 100644
index 2c88cbc..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1.go
+++ /dev/null
@@ -1,598 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package protocols
-
-import (
- "bufio"
- "bytes"
- "container/list"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/textproto"
- "strconv"
- "strings"
- "sync"
- "time"
-
- commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
- v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
- logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
-
- "github.com/sirupsen/logrus"
-
- "github.com/apache/skywalking-rover/pkg/process/api"
- "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
- "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
- "github.com/apache/skywalking-rover/pkg/tools"
-)
-
-var HTTP1ProtocolName = "http1"
-
-var HTTP1PackageSizeHistogramBuckets = []float64{
- // 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
- 256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
- // 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M
- 819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800,
-}
-
-var HTTP1DurationHistogramBuckets = []float64{
- // unit ms
- 1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290,
- 330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
-}
-
-var SlowTraceTopNSize = 10
-
-type HTTP1Analyzer struct {
- // cache connection metrics if the connect event not receive or process
- cache map[string]*HTTP1ConnectionMetrics
-}
-
-type HTTP1ConnectionMetrics struct {
- // halfData all data event(request/response) not finished
- halfData *list.List
-
- clientMetrics *HTTP1URIMetrics
- serverMetrics *HTTP1URIMetrics
- metricsLocker sync.RWMutex
-}
-
-type HTTP1URIMetrics struct {
- RequestCounter *metrics.Counter
- StatusCounter map[int]*metrics.Counter
-
- AvgRequestPackageSize *metrics.AvgCounter
- AvgResponsePackageSize *metrics.AvgCounter
- ReqPackageSizeHistogram *metrics.Histogram
- RespPackageSizeHistogram *metrics.Histogram
-
- avgDuration *metrics.AvgCounter
- durationHistogram *metrics.Histogram
-
- slowTraces *metrics.TopN
-}
-
-func NewHTTP1URIMetrics() *HTTP1URIMetrics {
- return &HTTP1URIMetrics{
- RequestCounter: metrics.NewCounter(),
- StatusCounter: make(map[int]*metrics.Counter),
- AvgRequestPackageSize: metrics.NewAvgCounter(),
- AvgResponsePackageSize: metrics.NewAvgCounter(),
- ReqPackageSizeHistogram: metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
- RespPackageSizeHistogram: metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
- avgDuration: metrics.NewAvgCounter(),
- durationHistogram: metrics.NewHistogram(HTTP1DurationHistogramBuckets),
- slowTraces: metrics.NewTopN(SlowTraceTopNSize),
- }
-}
-
-func NewHTTP1Analyzer() Protocol {
- return &HTTP1Analyzer{
- cache: make(map[string]*HTTP1ConnectionMetrics),
- }
-}
-
-func (h *HTTP1Analyzer) Name() string {
- return HTTP1ProtocolName
-}
-
-func (h *HTTP1Analyzer) GenerateMetrics() Metrics {
- return &HTTP1ConnectionMetrics{
- halfData: list.New(),
-
- clientMetrics: NewHTTP1URIMetrics(),
- serverMetrics: NewHTTP1URIMetrics(),
- }
-}
-
-func (h *HTTP1Analyzer) ReceiveData(context Context, event *SocketDataUploadEvent) bool {
- // only handle the HTTP1 protocol
- if event.Protocol != base.ConnectionProtocolHTTP {
- return false
- }
-
- connectionID := event.GenerateConnectionID()
- fromAnalyzerCache := false
- var connectionMetrics *HTTP1ConnectionMetrics
- connection := context.QueryConnection(event.ConnectionID, event.RandomID)
- // if connection not exists, then cached it into the analyzer context
- if connection == nil {
- connectionMetrics = h.cache[connectionID]
- fromAnalyzerCache = true
- if connectionMetrics == nil {
- connectionMetrics = h.GenerateMetrics().(*HTTP1ConnectionMetrics)
- h.cache[connectionID] = connectionMetrics
- }
- } else {
- connectionMetrics = QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
- }
-
- log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
- connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
- // if the cache is existing in the analyzer context, then delete it
- if !fromAnalyzerCache {
- if tmp := h.cache[connectionID]; tmp != nil {
- connectionMetrics.MergeFrom(h, tmp)
- delete(h.cache, connectionID)
- }
- }
-
- req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
- if req != nil && resp != nil {
- if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
- log.Errorf("HTTP1 analyze failure: %v", err)
- return false
- }
- } else {
- log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
- }
- return true
-}
-
-func (h *HTTP1Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
- lastAppender SocketDataBuffer) SocketDataBuffer {
- firstEvent := firstElement.Value.(*SocketDataUploadEvent)
- if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
- halfConnections.Remove(firstElement)
- return h.combineEventIfNeed(firstEvent, lastAppender)
- }
- next := firstElement.Next()
- halfConnections.Remove(firstElement)
- var buffer SocketDataBuffer = firstEvent
- // for-each the events until buffer finished
- for next != nil {
- event := next.Value.(*SocketDataUploadEvent)
-
- buffer = buffer.Combine(event)
-
- tmp := next.Next()
- halfConnections.Remove(next)
- next = tmp
- // combine event
- if event.Finished == 1 {
- return h.combineEventIfNeed(buffer, lastAppender)
- }
- }
- return h.combineEventIfNeed(buffer, lastAppender)
-}
-
-func (h *HTTP1Analyzer) combineEventIfNeed(data, appender SocketDataBuffer) SocketDataBuffer {
- if appender != nil {
- return data.Combine(appender)
- }
- return data
-}
-
-func (h *HTTP1Analyzer) buildHTTP1(halfConnections *list.List, event *SocketDataUploadEvent) (request, response SocketDataBuffer) {
- // no connections, then just add the response to the half connections to wait the request
- if halfConnections.Len() == 0 {
- halfConnections.PushBack(event)
- return nil, nil
- }
-
- // quick handler(only one element, and is request)
- if halfConnections.Len() == 1 {
- firstElement := halfConnections.Front()
- firstEvent := firstElement.Value.(*SocketDataUploadEvent)
- if firstEvent.IsStart() && firstEvent.IsFinished() && event.IsStart() && event.IsFinished() &&
- firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
- event.MsgType == base.SocketMessageTypeResponse {
- return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
- }
- }
-
- // push to the queue
- h.insertToList(halfConnections, event)
-
- // trying to find completed request and response
- return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
-}
-
-func (h *HTTP1Analyzer) insertToList(halfConnections *list.List, event *SocketDataUploadEvent) {
- if halfConnections.Len() == 0 {
- halfConnections.PushFront(event)
- return
- }
- if halfConnections.Back().Value.(*SocketDataUploadEvent).DataID < event.DataID {
- halfConnections.PushBack(event)
- return
- }
- beenAdded := false
- for element := halfConnections.Front(); element != nil; element = element.Next() {
- existEvent := element.Value.(*SocketDataUploadEvent)
- if existEvent.DataID > event.DataID {
- // data id needs order
- beenAdded = true
- } else if existEvent.DataID == event.DataID {
- if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
- // same message type and following the sequence order
- beenAdded = true
- } else if existEvent.MsgType > event.MsgType {
- // request needs before response
- beenAdded = true
- }
- }
- if beenAdded {
- halfConnections.InsertBefore(event, element)
- break
- }
- }
- if !beenAdded {
- halfConnections.PushBack(event)
- }
-}
-
-func (h *HTTP1Analyzer) analyze(_ Context, connectionID string, connectionMetrics *HTTP1ConnectionMetrics,
- requestBuffer, responseBuffer SocketDataBuffer) error {
- request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
- if err != nil {
- return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
- len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
- }
-
- response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
- if response != nil {
- defer response.Body.Close()
- }
- if err != nil {
- if err == io.ErrUnexpectedEOF || err == io.EOF {
- response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
- if err != nil {
- return fmt.Errorf("parsing simple data error: %v", err)
- }
- if response != nil && response.Body != nil {
- defer response.Body.Close()
- }
- }
- if err != nil {
- return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
- len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
- }
- }
-
- // lock append metrics with read locker
- connectionMetrics.metricsLocker.RLock()
- defer connectionMetrics.metricsLocker.RUnlock()
-
- // append metrics
- data := connectionMetrics.clientMetrics
- side := base.ConnectionRoleClient
- if requestBuffer.Direction() == base.SocketDataDirectionIngress {
- // if receive the request, that's mean is server side
- data = connectionMetrics.serverMetrics
- side = base.ConnectionRoleServer
- }
- h.appendToMetrics(data, request, requestBuffer, response, responseBuffer)
-
- if log.Enable(logrus.DebugLevel) {
- metricsJSON, _ := json.Marshal(data)
- log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
- }
- return nil
-}
-
-func (h *HTTP1Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
- if reader.Size() < 16 {
- return nil, fmt.Errorf("the header length not enough")
- }
- tp := textproto.NewReader(reader)
- resp := &http.Response{
- Request: request,
- }
-
- line, err := tp.ReadLine()
- if err != nil {
- return nil, fmt.Errorf("read response first line failure: %v", err)
- }
- indexByte := strings.IndexByte(line, ' ')
- if indexByte == -1 {
- return nil, fmt.Errorf("parsing response error: %s", line)
- }
- resp.Proto = line[:indexByte]
- resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
- statusCode := resp.Status
- if i := strings.IndexByte(resp.Status, ' '); i != -1 {
- statusCode = resp.Status[:i]
- }
- if len(statusCode) != 3 {
- return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
- }
- resp.StatusCode, err = strconv.Atoi(statusCode)
- if err != nil || resp.StatusCode < 0 {
- return nil, fmt.Errorf("status code not correct: %s", statusCode)
- }
- var ok bool
- if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
- return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
- }
-
- return resp, nil
-}
-
-func (h *HTTP1Analyzer) appendToMetrics(data *HTTP1URIMetrics, req *http.Request, reqBuffer SocketDataBuffer,
- resp *http.Response, respBuffer SocketDataBuffer) {
- data.RequestCounter.Increase()
- statusCounter := data.StatusCounter[resp.StatusCode]
- if statusCounter == nil {
- statusCounter = metrics.NewCounter()
- data.StatusCounter[resp.StatusCode] = statusCounter
- }
- statusCounter.Increase()
-
- data.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
- data.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
- data.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
- data.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
-
- duration := time.Duration(respBuffer.Time() - reqBuffer.Time())
- durationInMS := float64(duration.Milliseconds())
- data.avgDuration.Increase(durationInMS)
- data.durationHistogram.Increase(durationInMS)
-
- h.increaseSlowTraceTopN(data.slowTraces, duration, req, resp, reqBuffer, respBuffer)
-}
-
-func (h *HTTP1Analyzer) increaseSlowTraceTopN(slowTraceTopN *metrics.TopN, duration time.Duration,
- request *http.Request, _ *http.Response, reqBuffer, respBuffer SocketDataBuffer) {
- tracingContext, err := AnalyzeTracingContext(func(key string) string {
- return request.Header.Get(key)
- })
- if err != nil {
- log.Warnf("analyze tracing context error: %v", err)
- return
- }
- if tracingContext == nil {
- return
- }
-
- // remove the query parameters
- uri := request.RequestURI
- if i := strings.Index(uri, "?"); i > 0 {
- uri = uri[0:i]
- }
- trace := &HTTP1Trace{Trace: tracingContext, RequestURI: uri, RequestBuffer: reqBuffer, ResponseBuffer: respBuffer}
- slowTraceTopN.AddRecord(trace, duration.Milliseconds())
-}
-
-func (h *HTTP1ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
- other := QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
- other.metricsLocker.Lock()
- defer other.metricsLocker.Unlock()
-
- h.clientMetrics.MergeAndClean(other.clientMetrics)
- h.serverMetrics.MergeAndClean(other.serverMetrics)
- if log.Enable(logrus.DebugLevel) {
- clientMetrics, _ := json.Marshal(h.clientMetrics)
- serverMetrics, _ := json.Marshal(h.serverMetrics)
- log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
- connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
- }
-}
-
-func (h *HTTP1ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
- connectionMetrics := QueryProtocolMetrics(traffic.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
- for _, p := range traffic.LocalProcesses {
- // if the remote process is profiling, then used the client side
- localMetrics := connectionMetrics.clientMetrics
- remoteMetrics := connectionMetrics.serverMetrics
- if traffic.Role == base.ConnectionRoleServer {
- localMetrics = connectionMetrics.serverMetrics
- remoteMetrics = connectionMetrics.clientMetrics
- }
-
- metricsCount := h.appendMetrics(traffic, p, "", localMetrics, metricsBuilder, false)
- if traffic.RemoteProcessIsProfiling() {
- metricsCount += h.appendMetrics(traffic, p, "", remoteMetrics, metricsBuilder, true)
- }
- if metricsCount <= 0 {
- continue
- }
-
- if log.Enable(logrus.DebugLevel) {
- // if remote process is profiling, then the metrics data need to be cut half
- log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)"+
- traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
- connectionMetrics.clientMetrics.String(), connectionMetrics.serverMetrics.String())
- }
- }
-}
-
-func (h *HTTP1ConnectionMetrics) appendMetrics(traffic *base.ProcessTraffic,
- local api.ProcessInterface, url string, http1Metrics *HTTP1URIMetrics, metricsBuilder *base.MetricsBuilder, durationOnly bool) int {
- collections := make([]*v3.MeterData, 0)
- role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
- prefix := metricsBuilder.MetricPrefix()
-
- collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
- traffic, http1Metrics.avgDuration)
- collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
- traffic, http1Metrics.durationHistogram)
- if durationOnly {
- return len(collections)
- }
-
- collections = h.buildMetrics(collections, prefix, "request_counter", labels, url, traffic, http1Metrics.RequestCounter)
- for status, counter := range http1Metrics.StatusCounter {
- statusLabels := append(labels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
- collections = h.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic, counter)
- }
-
- collections = h.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, http1Metrics.AvgRequestPackageSize)
- collections = h.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, http1Metrics.AvgResponsePackageSize)
- collections = h.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic, http1Metrics.ReqPackageSizeHistogram)
- collections = h.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, http1Metrics.RespPackageSizeHistogram)
-
- metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
- logsCount := http1Metrics.slowTraces.AppendData(local, traffic, metricsBuilder)
- return len(collections) + logsCount
-}
-
-func (h *HTTP1ConnectionMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
- url string, _ *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
- // if remote process is also profiling, then needs to be calculated half of metrics
- labels := basicLabels
- var meterName string
- if url != "" {
- labels = append(labels, &v3.Label{Name: "url", Value: url})
- meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
- } else {
- meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
- }
- return data.AppendMeter(collection, meterName, labels)
-}
-
-func (h *HTTP1ConnectionMetrics) MergeFrom(analyzer *HTTP1Analyzer, other *HTTP1ConnectionMetrics) {
- if other.halfData != nil {
- for element := other.halfData.Front(); element != nil; element = element.Next() {
- analyzer.insertToList(h.halfData, element.Value.(*SocketDataUploadEvent))
- }
- }
-}
-
-func (u *HTTP1URIMetrics) MergeAndClean(other *HTTP1URIMetrics) {
- u.RequestCounter.MergeAndClean(other.RequestCounter)
- for k, v := range other.StatusCounter {
- if existing := u.StatusCounter[k]; existing != nil {
- existing.MergeAndClean(v)
- } else {
- u.StatusCounter[k] = v
- }
- }
-
- u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
- u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
- u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
- u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
- u.avgDuration.MergeAndClean(other.avgDuration)
- u.durationHistogram.MergeAndClean(other.durationHistogram)
- u.slowTraces.MergeAndClean(other.slowTraces)
-}
-
-func (u *HTTP1URIMetrics) String() string {
- return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, slow trace count: %d",
- u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
- u.avgDuration.Calculate(), u.slowTraces.List.Len())
-}
-
-type HTTP1Trace struct {
- Trace TracingContext
- RequestURI string
- RequestBuffer SocketDataBuffer
- ResponseBuffer SocketDataBuffer
-}
-
-func (h *HTTP1Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
- logData := &logv3.LogData{}
- logData.Service = process.Entity().ServiceName
- logData.ServiceInstance = process.Entity().InstanceName
- logData.Layer = process.Entity().Layer
-
- logData.Tags = &logv3.LogTags{Data: make([]*commonv3.KeyStringValuePair, 0)}
- logData.Tags.Data = append(logData.Tags.Data, &commonv3.KeyStringValuePair{Key: "LOG_KIND", Value: "NET_PROFILING_SAMPLED_TRACE"})
-
- // trace context
- traceContext := &logv3.TraceContext{}
- traceContext.TraceId = h.Trace.TraceID()
- logData.TraceContext = traceContext
-
- // body
- logBody := &logv3.LogDataBody{Type: "json"}
- body := &HTTP1SlowTraceLogBody{
- Latency: duration,
- TraceProvider: h.Trace.Provider(),
- DetectPoint: traffic.Role.String(),
- Component: traffic.Protocol.String(),
- SSL: traffic.IsSSL,
- URI: h.RequestURI,
- Reason: "slow",
- }
- if traffic.Role == base.ConnectionRoleClient {
- body.ClientProcess = &HTTP1SlowTraceLogProcess{ProcessID: process.ID()}
- body.ServerProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
- } else {
- body.ServerProcess = &HTTP1SlowTraceLogProcess{ProcessID: process.ID()}
- body.ClientProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
- }
- bodyJSON, err := json.Marshal(body)
- if err != nil {
- log.Warnf("format the slow trace log body failure: %v", err)
- return
- }
- logBody.Content = &logv3.LogDataBody_Json{Json: &logv3.JSONLog{Json: string(bodyJSON)}}
- logData.Body = logBody
-
- metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
-}
-
-type HTTP1SlowTraceLogBody struct {
- URI string `json:"uri"`
- Reason string `json:"reason"`
- Latency int64 `json:"latency"`
- TraceProvider string `json:"trace_provider"`
- ClientProcess *HTTP1SlowTraceLogProcess `json:"client_process"`
- ServerProcess *HTTP1SlowTraceLogProcess `json:"server_process"`
- DetectPoint string `json:"detect_point"`
- Component string `json:"component"`
- SSL bool `json:"ssl"`
-}
-
-type HTTP1SlowTraceLogProcess struct {
- ProcessID string `json:"process_id"`
- Local bool `json:"local"`
- Address string `json:"address"`
-}
-
-func NewHTTP1SlowTRaceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *HTTP1SlowTraceLogProcess {
- if len(traffic.RemoteProcesses) != 0 {
- for _, p := range traffic.RemoteProcesses {
- // only match with same service instance
- if local.Entity().ServiceName == p.Entity().ServiceName &&
- local.Entity().InstanceName == p.Entity().InstanceName {
- return &HTTP1SlowTraceLogProcess{ProcessID: p.ID()}
- }
- }
- }
-
- if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
- return &HTTP1SlowTraceLogProcess{Local: true}
- }
-
- return &HTTP1SlowTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
new file mode 100644
index 0000000..740a24a
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -0,0 +1,364 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http1
+
+import (
+ "bufio"
+ "bytes"
+ "container/list"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "net/textproto"
+ "strconv"
+ "strings"
+ "sync"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
+ "github.com/sirupsen/logrus"
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1")
+
+var ProtocolName = "http1"
+
+var PackageSizeHistogramBuckets = []float64{
+ // 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
+ 256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
+ // 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M
+ 819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800,
+}
+
+var DurationHistogramBuckets = []float64{
+ // unit ms
+ 1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290,
+ 330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
+}
+
+var SlowTraceTopNSize = 10
+
+type Analyzer struct {
+ // cache connection metrics if the connect event not receive or process
+ cache map[string]*ConnectionMetrics
+}
+
+type ConnectionMetrics struct {
+ // halfData all data event(request/response) not finished
+ halfData *list.List
+
+ clientMetrics *URIMetrics
+ serverMetrics *URIMetrics
+ metricsLocker sync.RWMutex
+}
+
+func NewHTTP1Analyzer() protocol.Protocol {
+ return &Analyzer{
+ cache: make(map[string]*ConnectionMetrics),
+ }
+}
+
+func (h *Analyzer) Name() string {
+ return ProtocolName
+}
+
+func (h *Analyzer) GenerateMetrics() protocol.Metrics {
+ return &ConnectionMetrics{
+ halfData: list.New(),
+
+ clientMetrics: NewHTTP1URIMetrics(),
+ serverMetrics: NewHTTP1URIMetrics(),
+ }
+}
+
+func (h *Analyzer) ReceiveData(context protocol.Context, event *protocol.SocketDataUploadEvent) bool {
+ // only handle the HTTP1 protocol
+ if event.Protocol != base.ConnectionProtocolHTTP {
+ return false
+ }
+
+ connectionID := event.GenerateConnectionID()
+ fromAnalyzerCache := false
+ var connectionMetrics *ConnectionMetrics
+ connection := context.QueryConnection(event.ConnectionID, event.RandomID)
+ // if connection not exists, then cached it into the analyzer context
+ if connection == nil {
+ connectionMetrics = h.cache[connectionID]
+ fromAnalyzerCache = true
+ if connectionMetrics == nil {
+ connectionMetrics = h.GenerateMetrics().(*ConnectionMetrics)
+ h.cache[connectionID] = connectionMetrics
+ }
+ } else {
+ connectionMetrics = context.QueryProtocolMetrics(connection.Metrics, ProtocolName).(*ConnectionMetrics)
+ }
+
+ log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
+ connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
+ // if the cache is existing in the analyzer context, then delete it
+ if !fromAnalyzerCache {
+ if tmp := h.cache[connectionID]; tmp != nil {
+ connectionMetrics.MergeFrom(h, tmp)
+ delete(h.cache, connectionID)
+ }
+ }
+
+ req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
+ if req != nil && resp != nil {
+ if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
+ log.Errorf("HTTP1 analyze failure: %v", err)
+ return false
+ }
+ } else {
+ log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
+ }
+ return true
+}
+
+func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
+ lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
+ firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
+ if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
+ halfConnections.Remove(firstElement)
+ return h.combineEventIfNeed(firstEvent, lastAppender)
+ }
+ next := firstElement.Next()
+ halfConnections.Remove(firstElement)
+ var buffer protocol.SocketDataBuffer = firstEvent
+ // for-each the events until buffer finished
+ for next != nil {
+ event := next.Value.(*protocol.SocketDataUploadEvent)
+
+ buffer = buffer.Combine(event)
+
+ tmp := next.Next()
+ halfConnections.Remove(next)
+ next = tmp
+ // combine event
+ if event.Finished == 1 {
+ return h.combineEventIfNeed(buffer, lastAppender)
+ }
+ }
+ return h.combineEventIfNeed(buffer, lastAppender)
+}
+
+func (h *Analyzer) combineEventIfNeed(data, appender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
+ if appender != nil {
+ return data.Combine(appender)
+ }
+ return data
+}
+
+func (h *Analyzer) buildHTTP1(halfConnections *list.List, event *protocol.SocketDataUploadEvent) (request, response protocol.SocketDataBuffer) {
+ // no connections, then just add the response to the half connections to wait the request
+ if halfConnections.Len() == 0 {
+ halfConnections.PushBack(event)
+ return nil, nil
+ }
+
+ // quick handler(only one element, and is request)
+ if halfConnections.Len() == 1 {
+ firstElement := halfConnections.Front()
+ firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
+ if firstEvent.IsStart() && firstEvent.Finished == 1 && event.IsStart() && event.Finished == 1 &&
+ firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
+ event.MsgType == base.SocketMessageTypeResponse {
+ return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
+ }
+ }
+
+ // push to the queue
+ h.insertToList(halfConnections, event)
+
+ // trying to find completed request and response
+ return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
+}
+
+func (h *Analyzer) insertToList(halfConnections *list.List, event *protocol.SocketDataUploadEvent) {
+ if halfConnections.Len() == 0 {
+ halfConnections.PushFront(event)
+ return
+ }
+ if halfConnections.Back().Value.(*protocol.SocketDataUploadEvent).DataID < event.DataID {
+ halfConnections.PushBack(event)
+ return
+ }
+ beenAdded := false
+ for element := halfConnections.Front(); element != nil; element = element.Next() {
+ existEvent := element.Value.(*protocol.SocketDataUploadEvent)
+ if existEvent.DataID > event.DataID {
+ // data id needs order
+ beenAdded = true
+ } else if existEvent.DataID == event.DataID {
+ if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
+ // same message type and following the sequence order
+ beenAdded = true
+ } else if existEvent.MsgType > event.MsgType {
+ // request needs before response
+ beenAdded = true
+ }
+ }
+ if beenAdded {
+ halfConnections.InsertBefore(event, element)
+ break
+ }
+ }
+ if !beenAdded {
+ halfConnections.PushBack(event)
+ }
+}
+
+func (h *Analyzer) analyze(_ protocol.Context, connectionID string, connectionMetrics *ConnectionMetrics,
+ requestBuffer, responseBuffer protocol.SocketDataBuffer) error {
+ request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
+ if err != nil {
+ return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
+ len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
+ }
+
+ response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
+ if response != nil {
+ defer response.Body.Close()
+ }
+ if err != nil {
+ if err == io.ErrUnexpectedEOF || err == io.EOF {
+ response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
+ if err != nil {
+ return fmt.Errorf("parsing simple data error: %v", err)
+ }
+ if response != nil && response.Body != nil {
+ defer response.Body.Close()
+ }
+ }
+ if err != nil {
+ return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
+ len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
+ }
+ }
+
+ // lock append metrics with read locker
+ connectionMetrics.metricsLocker.RLock()
+ defer connectionMetrics.metricsLocker.RUnlock()
+
+ // append metrics
+ data := connectionMetrics.clientMetrics
+ side := base.ConnectionRoleClient
+ if requestBuffer.Direction() == base.SocketDataDirectionIngress {
+ // if receive the request, that's mean is server side
+ data = connectionMetrics.serverMetrics
+ side = base.ConnectionRoleServer
+ }
+ data.Append(request, requestBuffer, response, responseBuffer)
+
+ if log.Enable(logrus.DebugLevel) {
+ metricsJSON, _ := json.Marshal(data)
+ log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
+ }
+ return nil
+}
+
+func (h *Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
+ if reader.Size() < 16 {
+ return nil, fmt.Errorf("the header length not enough")
+ }
+ tp := textproto.NewReader(reader)
+ resp := &http.Response{
+ Request: request,
+ }
+
+ line, err := tp.ReadLine()
+ if err != nil {
+ return nil, fmt.Errorf("read response first line failure: %v", err)
+ }
+ indexByte := strings.IndexByte(line, ' ')
+ if indexByte == -1 {
+ return nil, fmt.Errorf("parsing response error: %s", line)
+ }
+ resp.Proto = line[:indexByte]
+ resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
+ statusCode := resp.Status
+ if i := strings.IndexByte(resp.Status, ' '); i != -1 {
+ statusCode = resp.Status[:i]
+ }
+ if len(statusCode) != 3 {
+ return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
+ }
+ resp.StatusCode, err = strconv.Atoi(statusCode)
+ if err != nil || resp.StatusCode < 0 {
+ return nil, fmt.Errorf("status code not correct: %s", statusCode)
+ }
+ var ok bool
+ if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
+ return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+ }
+
+ return resp, nil
+}
+
+func (h *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+ other := data.(*ConnectionMetrics)
+ other.metricsLocker.Lock()
+ defer other.metricsLocker.Unlock()
+
+ h.clientMetrics.MergeAndClean(other.clientMetrics)
+ h.serverMetrics.MergeAndClean(other.serverMetrics)
+ if log.Enable(logrus.DebugLevel) {
+ clientMetrics, _ := json.Marshal(h.clientMetrics)
+ serverMetrics, _ := json.Marshal(h.serverMetrics)
+ log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
+ connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
+ }
+}
+
+func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+ for _, p := range traffic.LocalProcesses {
+ // if the remote process is profiling, then used the client side
+ localMetrics := h.clientMetrics
+ remoteMetrics := h.serverMetrics
+ if traffic.Role == base.ConnectionRoleServer {
+ localMetrics = h.serverMetrics
+ remoteMetrics = h.clientMetrics
+ }
+
+ metricsCount := localMetrics.appendMetrics(traffic, p, "", metricsBuilder, false)
+ if traffic.RemoteProcessIsProfiling() {
+ metricsCount += remoteMetrics.appendMetrics(traffic, p, "", metricsBuilder, true)
+ }
+ if metricsCount <= 0 {
+ continue
+ }
+
+ if log.Enable(logrus.DebugLevel) {
+ // if remote process is profiling, then the metrics data need to be cut half
+ log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)",
+ traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
+ h.clientMetrics.String(), h.serverMetrics.String())
+ }
+ }
+}
+
+func (h *ConnectionMetrics) MergeFrom(analyzer *Analyzer, other *ConnectionMetrics) {
+ if other.halfData != nil {
+ for element := other.halfData.Front(); element != nil; element = element.Next() {
+ analyzer.insertToList(h.halfData, element.Value.(*protocol.SocketDataUploadEvent))
+ }
+ }
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
similarity index 94%
rename from pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
index 32ad96a..a663a6e 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package protocols
+package http1
import (
"bufio"
@@ -25,6 +25,8 @@ import (
"strings"
"testing"
+ base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
)
@@ -243,13 +245,13 @@ func TestBuildHTTP1(t *testing.T) {
for _, testCase := range tests {
//t.Run(testCase.name, func(t *testing.T) {
- analyzer := NewHTTP1Analyzer().(*HTTP1Analyzer)
+ analyzer := NewHTTP1Analyzer().(*Analyzer)
l := list.New()
var events = make([]struct {
start, end int
}, 0)
for _, event := range testCase.events {
- req, resp := analyzer.buildHTTP1(l, &SocketDataUploadEvent{
+ req, resp := analyzer.buildHTTP1(l, &base2.SocketDataUploadEvent{
DataID: uint64(event.dataID),
MsgType: base.SocketMessageType(event.dataType),
Sequence: uint16(event.sequence),
@@ -272,7 +274,7 @@ func TestBuildHTTP1(t *testing.T) {
}
actualList := make([]int, 0)
for element := l.Front(); element != nil; element = element.Next() {
- actualList = append(actualList, int(element.Value.(*SocketDataUploadEvent).DataID))
+ actualList = append(actualList, int(element.Value.(*base2.SocketDataUploadEvent).DataID))
}
if !reflect.DeepEqual(exceptedList, actualList) {
t.Fatalf("excepted residue data list: %v, actual: %v", exceptedList, actualList)
@@ -297,7 +299,7 @@ func bufferConvert(data string) [2048]byte {
func TestParseSimpleHTTP1Response(t *testing.T) {
s := `HTTP/1.0 200 OK\r\n`
h := &http.Request{}
- analyzer := NewHTTP1Analyzer().(*HTTP1Analyzer)
+ analyzer := NewHTTP1Analyzer().(*Analyzer)
resp, err := analyzer.tryingToReadResponseWithoutHeaders(bufio.NewReader(strings.NewReader(s)), h)
if err != nil {
t.Fatalf("reading simple response error: %v", err)
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
similarity index 76%
rename from pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
index e8420ef..733919b 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package protocols
+package http1
import (
"bufio"
@@ -23,18 +23,20 @@ import (
"container/list"
"net/http"
+ base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
)
-type HTTP1BufferAnalyzer struct {
- http1Analyzer *HTTP1Analyzer
+type BufferAnalyzer struct {
+ http1Analyzer *Analyzer
- unknownEventBuffer SocketDataBuffer
+ unknownEventBuffer base2.SocketDataBuffer
unknownElement *list.Element
unknownSize int
- request *SocketDataUploadEvent
+ request *base2.SocketDataUploadEvent
requestElement *list.Element
- response *SocketDataUploadEvent
+ response *base2.SocketDataUploadEvent
responseElement *list.Element
unknownDataID uint64
@@ -47,13 +49,13 @@ type HTTP1BufferAnalyzer struct {
respFinished bool
}
-func NewHTTP1BufferAnalyzer(http1 *HTTP1Analyzer) *HTTP1BufferAnalyzer {
- return &HTTP1BufferAnalyzer{http1Analyzer: http1}
+func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
+ return &BufferAnalyzer{http1Analyzer: http1}
}
-func (h *HTTP1BufferAnalyzer) Analyze(events *list.List) (request, response SocketDataBuffer) {
+func (h *BufferAnalyzer) Analyze(events *list.List) (request, response base2.SocketDataBuffer) {
for element := events.Front(); element != nil; element = element.Next() {
- curEvent := element.Value.(*SocketDataUploadEvent)
+ curEvent := element.Value.(*base2.SocketDataUploadEvent)
// transform the unknown to the request or response
if continueReading, req, resp := h.handleUnknown(events, element, curEvent); req != nil && resp != nil {
return req, resp
@@ -74,8 +76,8 @@ func (h *HTTP1BufferAnalyzer) Analyze(events *list.List) (request, response Sock
return nil, nil
}
-func (h *HTTP1BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
- curEvent *SocketDataUploadEvent) (continueReading bool, req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
+ curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
if curEvent.MsgType != base.SocketMessageTypeUnknown {
return false, nil, nil
}
@@ -111,8 +113,8 @@ func (h *HTTP1BufferAnalyzer) handleUnknown(event *list.List, element *list.Elem
return false, nil, nil
}
-func (h *HTTP1BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
- curEvent *SocketDataUploadEvent) (continueReading bool, req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
+ curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
if h.request == nil {
// find the first request package event
if curEvent.MsgType == base.SocketMessageTypeRequest && curEvent.IsStart() {
@@ -145,7 +147,8 @@ func (h *HTTP1BufferAnalyzer) handleRequest(events *list.List, element *list.Ele
return false, nil, nil
}
-func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.Element, curEvent *SocketDataUploadEvent) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleResponse(events *list.List, element *list.Element,
+ curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
if h.response == nil {
// if current response is not start, then clean to re-find new one
if !curEvent.IsStart() {
@@ -166,10 +169,11 @@ func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.El
}
// if response sequence is broken, then clean the context
- if !h.respFinished && (h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence) {
+ if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence {
h.cleanContext()
return nil, nil
}
+ h.respDataID = curEvent.DataID
h.respMaxSequence = curEvent.Sequence
if h.reqFinished && curEvent.IsFinished() {
@@ -178,14 +182,14 @@ func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.El
return nil, nil
}
-func (h *HTTP1BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.unknownEventBuffer = curEvent
h.unknownElement = element
h.unknownDataID = curEvent.DataID
h.unknownMaxSequence = curEvent.Sequence
}
-func (h *HTTP1BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.request = curEvent
h.reqDataID = curEvent.DataID
h.reqMaxSequence = curEvent.Sequence
@@ -193,7 +197,7 @@ func (h *HTTP1BufferAnalyzer) resetStartRequest(element *list.Element, curEvent
h.requestElement = element
}
-func (h *HTTP1BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
h.response = curEvent
h.respDataID = curEvent.DataID
h.respMaxSequence = curEvent.Sequence
@@ -201,7 +205,7 @@ func (h *HTTP1BufferAnalyzer) resetStartResponse(element *list.Element, curEvent
h.respFinished = curEvent.IsFinished()
}
-func (h *HTTP1BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *SocketDataUploadEvent) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
if h.unknownEventBuffer == nil {
return nil, nil
}
@@ -237,13 +241,13 @@ func (h *HTTP1BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEv
return nil, nil
}
-func (h *HTTP1BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
+func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
// update message type and total size
- firstEvent := element.Value.(*SocketDataUploadEvent)
+ firstEvent := element.Value.(*base2.SocketDataUploadEvent)
firstEvent.MsgType = msgType
dataLen := int(firstEvent.DataLen)
for e := element.Next(); e != nil; e = e.Next() {
- curEvent := e.Value.(*SocketDataUploadEvent)
+ curEvent := e.Value.(*base2.SocketDataUploadEvent)
if curEvent.Finished == 1 {
curEvent.MsgType = msgType
dataLen += int(curEvent.DataLen)
@@ -255,32 +259,32 @@ func (h *HTTP1BufferAnalyzer) transformUnknown(element *list.Element, msgType ba
}
}
-func (h *HTTP1BufferAnalyzer) cleanContext() {
+func (h *BufferAnalyzer) cleanContext() {
h.cleanUnknownContext()
h.cleanRequestContext()
h.cleanResponseContext()
}
-func (h *HTTP1BufferAnalyzer) cleanResponseContext() {
+func (h *BufferAnalyzer) cleanResponseContext() {
h.response = nil
h.respDataID = 0
h.respMaxSequence = 0
h.respFinished = false
}
-func (h *HTTP1BufferAnalyzer) cleanRequestContext() {
+func (h *BufferAnalyzer) cleanRequestContext() {
h.request = nil
h.reqDataID = 0
h.reqMaxSequence = 0
h.reqFinished = false
}
-func (h *HTTP1BufferAnalyzer) cleanUnknownContext() {
+func (h *BufferAnalyzer) cleanUnknownContext() {
h.unknownEventBuffer, h.unknownElement = nil, nil
h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
}
-func (h *HTTP1BufferAnalyzer) buildHTTP(events *list.List) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp base2.SocketDataBuffer) {
return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, nil),
h.http1Analyzer.combineAndRemoveEvent(events, h.responseElement, nil)
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
new file mode 100644
index 0000000..cc3014f
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -0,0 +1,390 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http1
+
+import (
+ "bufio"
+ "bytes"
+ "compress/gzip"
+ "encoding/json"
+ "fmt"
+ "io"
+ "mime"
+ "net/http"
+ "strings"
+ "time"
+
+ "golang.org/x/net/html/charset"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
+ "github.com/apache/skywalking-rover/pkg/tools"
+ "github.com/apache/skywalking-rover/pkg/tools/host"
+
+ "github.com/docker/go-units"
+
+ commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+ logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
+)
+
+var (
+ transportRequest = "Request"
+ transportResponse = "Response"
+)
+
+type URIMetrics struct {
+ RequestCounter *metrics.Counter
+ StatusCounter map[int]*metrics.Counter
+
+ AvgRequestPackageSize *metrics.AvgCounter
+ AvgResponsePackageSize *metrics.AvgCounter
+ ReqPackageSizeHistogram *metrics.Histogram
+ RespPackageSizeHistogram *metrics.Histogram
+
+ avgDuration *metrics.AvgCounter
+ durationHistogram *metrics.Histogram
+
+ slowTraces *metrics.TopN
+}
+
+func NewHTTP1URIMetrics() *URIMetrics {
+ return &URIMetrics{
+ RequestCounter: metrics.NewCounter(),
+ StatusCounter: make(map[int]*metrics.Counter),
+ AvgRequestPackageSize: metrics.NewAvgCounter(),
+ AvgResponsePackageSize: metrics.NewAvgCounter(),
+ ReqPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
+ RespPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
+ avgDuration: metrics.NewAvgCounter(),
+ durationHistogram: metrics.NewHistogram(DurationHistogramBuckets),
+ slowTraces: metrics.NewTopN(SlowTraceTopNSize),
+ }
+}
+
+func (u *URIMetrics) Append(req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
+ u.RequestCounter.Increase()
+ statusCounter := u.StatusCounter[resp.StatusCode]
+ if statusCounter == nil {
+ statusCounter = metrics.NewCounter()
+ u.StatusCounter[resp.StatusCode] = statusCounter
+ }
+ statusCounter.Increase()
+
+ u.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
+ u.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
+ u.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
+ u.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
+
+ duration := time.Duration(respBuffer.EndTime() - reqBuffer.StartTime())
+ durationInMS := float64(duration.Milliseconds())
+ u.avgDuration.Increase(durationInMS)
+ u.durationHistogram.Increase(durationInMS)
+
+ u.increaseSlowTraceTopN(u.slowTraces, duration, req, resp, reqBuffer, respBuffer)
+}
+
+func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
+ local api.ProcessInterface, url string, metricsBuilder *base.MetricsBuilder, durationOnly bool) int {
+ collections := make([]*v3.MeterData, 0)
+ role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
+ prefix := metricsBuilder.MetricPrefix()
+
+ collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
+ traffic, u.avgDuration)
+ collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
+ traffic, u.durationHistogram)
+ if durationOnly {
+ return len(collections)
+ }
+
+ collections = u.buildMetrics(collections, prefix, "request_counter", labels, url, traffic, u.RequestCounter)
+ for status, counter := range u.StatusCounter {
+ statusLabels := append(labels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
+ collections = u.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic, counter)
+ }
+
+ collections = u.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, u.AvgRequestPackageSize)
+ collections = u.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, u.AvgResponsePackageSize)
+ collections = u.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic, u.ReqPackageSizeHistogram)
+ collections = u.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, u.RespPackageSizeHistogram)
+
+ metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
+ logsCount := u.slowTraces.AppendData(local, traffic, metricsBuilder)
+ return len(collections) + logsCount
+}
+
+func (u *URIMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
+ url string, _ *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
+ // if remote process is also profiling, then needs to be calculated half of metrics
+ labels := basicLabels
+ var meterName string
+ if url != "" {
+ labels = append(labels, &v3.Label{Name: "url", Value: url})
+ meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
+ } else {
+ meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
+ }
+ return data.AppendMeter(collection, meterName, labels)
+}
+
+func (u *URIMetrics) MergeAndClean(other *URIMetrics) {
+ u.RequestCounter.MergeAndClean(other.RequestCounter)
+ for k, v := range other.StatusCounter {
+ cur := u.StatusCounter[k]
+ if cur == nil {
+ cur = metrics.NewCounter()
+ u.StatusCounter[k] = cur
+ }
+ cur.MergeAndClean(v)
+ }
+
+ u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
+ u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
+ u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
+ u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
+ u.avgDuration.MergeAndClean(other.avgDuration)
+ u.durationHistogram.MergeAndClean(other.durationHistogram)
+ u.slowTraces.MergeAndClean(other.slowTraces)
+}
+
+func (u *URIMetrics) String() string {
+ return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, slow trace count: %d, response counters: %v",
+ u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
+ u.avgDuration.Calculate(), u.slowTraces.List.Len(), u.StatusCounter)
+}
+
+func (u *URIMetrics) increaseSlowTraceTopN(slowTraceTopN *metrics.TopN, duration time.Duration,
+ request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
+ tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
+ return request.Header.Get(key)
+ })
+ if err != nil {
+ log.Warnf("analyze tracing context error: %v", err)
+ return
+ }
+ if tracingContext == nil {
+ return
+ }
+
+ // remove the query parameters
+ uri := request.RequestURI
+ if i := strings.Index(uri, "?"); i > 0 {
+ uri = uri[0:i]
+ }
+ trace := &Trace{Trace: tracingContext, RequestURI: uri, RequestBuffer: reqBuffer, ResponseBuffer: respBuffer, Request: request, Response: response}
+ slowTraceTopN.AddRecord(trace, duration.Milliseconds())
+}
+
+type Trace struct {
+ Trace protocol.TracingContext
+ RequestURI string
+ RequestBuffer protocol.SocketDataBuffer
+ Request *http.Request
+ ResponseBuffer protocol.SocketDataBuffer
+ Response *http.Response
+}
+
+func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+ logData := &logv3.LogData{}
+ logData.Service = process.Entity().ServiceName
+ logData.ServiceInstance = process.Entity().InstanceName
+ logData.Layer = process.Entity().Layer
+
+ logData.Tags = &logv3.LogTags{Data: make([]*commonv3.KeyStringValuePair, 0)}
+ logData.Tags.Data = append(logData.Tags.Data, &commonv3.KeyStringValuePair{Key: "LOG_KIND", Value: "NET_PROFILING_SAMPLED_TRACE"})
+
+ // trace context
+ traceContext := &logv3.TraceContext{}
+ traceContext.TraceId = h.Trace.TraceID()
+ logData.TraceContext = traceContext
+
+ // body
+ logBody := &logv3.LogDataBody{Type: "json"}
+ body := &SlowTraceLogBody{
+ Latency: duration,
+ TraceProvider: h.Trace.Provider().Name,
+ DetectPoint: traffic.Role.String(),
+ Component: traffic.Protocol.String(),
+ SSL: traffic.IsSSL,
+ URI: h.RequestURI,
+ Reason: "slow",
+ }
+ if traffic.Role == base.ConnectionRoleClient {
+ body.ClientProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
+ body.ServerProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+ } else {
+ body.ServerProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
+ body.ClientProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+ }
+ bodyJSON, err := json.Marshal(body)
+ if err != nil {
+ log.Warnf("format the slow trace log body failure: %v", err)
+ return
+ }
+ logBody.Content = &logv3.LogDataBody_Json{Json: &logv3.JSONLog{Json: string(bodyJSON)}}
+ logData.Body = logBody
+
+ metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
+
+ // append full http content
+ h.AppendHTTPEvents(process, traffic, metricsBuilder)
+}
+
+func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+ events := make([]*v3.SpanAttachedEvent, 0)
+ events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header, h.Request.Body, h.RequestBuffer)
+ events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header, h.Response.Body, h.ResponseBuffer)
+
+ metricsBuilder.AppendSpanAttachedEvents(events)
+}
+
+func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
+ tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) []*v3.SpanAttachedEvent {
+ content, err := h.transformHTTPRequest(header, body, buffer)
+ if err != nil {
+ log.Warnf("transform http %s erorr: %v", tp, err)
+ return events
+ }
+
+ event := &v3.SpanAttachedEvent{}
+ event.StartTime = host.TimeToInstant(buffer.StartTime())
+ event.EndTime = host.TimeToInstant(buffer.EndTime())
+ event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
+ event.Tags = make([]*commonv3.KeyStringValuePair, 0)
+ event.Tags = append(event.Tags,
+ // content data
+ &commonv3.KeyStringValuePair{Key: "data size", Value: units.BytesSize(float64(buffer.TotalSize()))},
+ &commonv3.KeyStringValuePair{Key: "data content", Value: content},
+ &commonv3.KeyStringValuePair{Key: "data direction", Value: buffer.Direction().String()},
+ &commonv3.KeyStringValuePair{Key: "data type", Value: tp},
+ // connection
+ &commonv3.KeyStringValuePair{Key: "connection role", Value: traffic.Role.String()},
+ // entity
+ &commonv3.KeyStringValuePair{Key: "service name", Value: process.Entity().ServiceName},
+ &commonv3.KeyStringValuePair{Key: "service instance name", Value: process.Entity().InstanceName},
+ &commonv3.KeyStringValuePair{Key: "process name", Value: process.Entity().ProcessName},
+ )
+
+ // current event needs bind to the upstream
+ if buffer.Direction() == base.SocketDataDirectionIngress && tp == transportRequest ||
+ buffer.Direction() == base.SocketDataDirectionEgress && tp == transportResponse {
+ event.Tags = append(event.Tags, &commonv3.KeyStringValuePair{Key: "bind to upstream span", Value: "true"})
+ }
+ event.Summary = make([]*commonv3.KeyIntValuePair, 0)
+ event.TraceContext = &v3.SpanAttachedEvent_SpanReference{
+ TraceId: h.Trace.TraceID(),
+ TraceSegmentId: h.Trace.TraceSegmentID(),
+ SpanId: h.Trace.SpanID(),
+ Type: h.Trace.Provider().Type,
+ }
+ return append(events, event)
+}
+
+// nolint
+func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) (string, error) {
+ var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == "gzip", true, true
+ contentType := header.Get("Content-Type")
+ if contentType != "" {
+ isPlain = strings.HasPrefix(contentType, "text/") || contentType == "application/json"
+ if _, params, err := mime.ParseMediaType(contentType); err == nil {
+ if cs, ok := params["charset"]; ok {
+ isUtf8 = cs == "utf-8"
+ }
+ }
+ }
+
+ if !needGzip && isPlain && isUtf8 {
+ return string(buffer.BufferData()), nil
+ }
+
+ // re-read the buffer and skip to the body position
+ buf := bufio.NewReaderSize(bytes.NewBuffer(buffer.BufferData()), len(buffer.BufferData()))
+ response, err := http.ReadResponse(buf, nil)
+ if err != nil {
+ return "", err
+ }
+ defer response.Body.Close()
+
+ // no text plain, no need to print the data
+ headerString := string(buffer.BufferData()[:len(buffer.BufferData())-buf.Buffered()])
+ if !isPlain {
+ return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
+ }
+
+ data := body
+ if needGzip {
+ data, err = gzip.NewReader(response.Body)
+ if err != nil {
+ return "", err
+ }
+ }
+ if !isUtf8 {
+ data, err = charset.NewReader(data, contentType)
+ if err != nil {
+ return "", err
+ }
+ }
+
+ realData, err := io.ReadAll(data)
+ if err != nil {
+ if err != io.ErrUnexpectedEOF {
+ return "", err
+ }
+ realData = append(realData, []byte("[chunked]")...)
+ }
+ return fmt.Sprintf("%s%s", headerString, string(realData)), nil
+}
+
+type SlowTraceLogBody struct {
+ URI string `json:"uri"`
+ Reason string `json:"reason"`
+ Latency int64 `json:"latency"`
+ TraceProvider string `json:"trace_provider"`
+ ClientProcess *SlowTraceLogProcess `json:"client_process"`
+ ServerProcess *SlowTraceLogProcess `json:"server_process"`
+ DetectPoint string `json:"detect_point"`
+ Component string `json:"component"`
+ SSL bool `json:"ssl"`
+}
+
+type SlowTraceLogProcess struct {
+ ProcessID string `json:"process_id"`
+ Local bool `json:"local"`
+ Address string `json:"address"`
+}
+
+func NewHTTP1SlowTRaceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *SlowTraceLogProcess {
+ if len(traffic.RemoteProcesses) != 0 {
+ for _, p := range traffic.RemoteProcesses {
+ // only match with same service instance
+ if local.Entity().ServiceName == p.Entity().ServiceName &&
+ local.Entity().InstanceName == p.Entity().InstanceName {
+ return &SlowTraceLogProcess{ProcessID: p.ID()}
+ }
+ }
+ }
+
+ if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
+ return &SlowTraceLogProcess{Local: true}
+ }
+
+ return &SlowTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go b/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
index 23d193e..2a01e66 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
@@ -17,7 +17,11 @@
package metrics
-import v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+import (
+ "fmt"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
type Counter struct {
Value int
@@ -29,6 +33,10 @@ func NewCounter() *Counter {
}
}
+func (c *Counter) String() string {
+ return fmt.Sprintf("%d", c.Value)
+}
+
func (c *Counter) Increase() {
c.Value++
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 0fb1487..22735fc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -20,51 +20,33 @@ package protocols
import (
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
)
var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols")
-var ListenerName = "layer7"
-
-var registerProtocols []func() Protocol
-var defaultInstances []Protocol
+var registerProtocols []func() protocol.Protocol
+var defaultInstances []protocol.Protocol
func init() {
// register all protocol analyzers
- registerProtocols = make([]func() Protocol, 0)
- registerProtocols = append(registerProtocols, NewHTTP1Analyzer)
+ registerProtocols = make([]func() protocol.Protocol, 0)
+ registerProtocols = append(registerProtocols, http1.NewHTTP1Analyzer)
- defaultInstances = make([]Protocol, 0)
+ defaultInstances = make([]protocol.Protocol, 0)
for _, p := range registerProtocols {
defaultInstances = append(defaultInstances, p())
}
}
-type Protocol interface {
- Name() string
- GenerateMetrics() Metrics
-
- ReceiveData(context Context, event *SocketDataUploadEvent) bool
-}
-
-type Context interface {
- QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
-}
-
-type Metrics interface {
- base.ConnectionMetrics
-
- // FlushMetrics flush all metrics from traffic to the metricsBuilder
- FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder)
-}
-
type Analyzer struct {
- ctx Context
- protocols []Protocol
+ ctx protocol.Context
+ protocols []protocol.Protocol
}
-func NewAnalyzer(ctx Context) *Analyzer {
- protocols := make([]Protocol, 0)
+func NewAnalyzer(ctx protocol.Context) *Analyzer {
+ protocols := make([]protocol.Protocol, 0)
for _, r := range registerProtocols {
protocols = append(protocols, r())
}
@@ -74,7 +56,7 @@ func NewAnalyzer(ctx Context) *Analyzer {
}
}
-func (a *Analyzer) ReceiveSocketDataEvent(event *SocketDataUploadEvent) {
+func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
for _, p := range a.protocols {
if p.ReceiveData(a.ctx, event) {
return
@@ -85,24 +67,25 @@ func (a *Analyzer) ReceiveSocketDataEvent(event *SocketDataUploadEvent) {
}
type ProtocolMetrics struct {
- data map[string]Metrics
+ data map[string]protocol.Metrics
}
func NewProtocolMetrics() *ProtocolMetrics {
- metrics := make(map[string]Metrics)
+ metrics := make(map[string]protocol.Metrics)
for _, p := range defaultInstances {
metrics[p.Name()] = p.GenerateMetrics()
}
return &ProtocolMetrics{data: metrics}
}
-func (m *ProtocolMetrics) GetProtocolMetrics(name string) Metrics {
+func (m *ProtocolMetrics) GetProtocolMetrics(name string) protocol.Metrics {
return m.data[name]
}
-func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
- for _, d := range m.data {
- d.MergeMetricsFromConnection(connection)
+func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+ otherMetrics := data.(*ProtocolMetrics)
+ for p, d := range m.data {
+ d.MergeMetricsFromConnection(connection, otherMetrics.GetProtocolMetrics(p))
}
}
@@ -111,8 +94,3 @@ func (m *ProtocolMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuil
d.FlushMetrics(traffic, metricsBuilder)
}
}
-
-func QueryProtocolMetrics(metricsContext *base.ConnectionMetricsContext, protocolName string) Metrics {
- metrics := metricsContext.GetMetrics(ListenerName).(*ProtocolMetrics)
- return metrics.GetProtocolMetrics(protocolName)
-}
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index ac4dbe4..6b8063d 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -51,6 +51,7 @@ type Runner struct {
stopOnce sync.Once
meterClient v3.MeterReportServiceClient
logClient logv3.LogReportServiceClient
+ eventClient v3.SpanAttachedEventReportServiceClient
reportInterval time.Duration
meterPrefix string
@@ -211,7 +212,15 @@ func (r *Runner) flushData() error {
if count, err1 := r.flushLogs(metricsBuilder); err1 != nil {
err = multierror.Append(err, err1)
} else if count > 0 {
- log.Infof("total send network topology logs data: %d", count)
+ log.Infof("total send network logs data: %d", count)
+ }
+
+ eventCount, eventError := r.flushEvents(metricsBuilder)
+ if eventError != nil {
+ err = multierror.Append(err, eventError)
+ }
+ if eventCount > 0 {
+ log.Infof("total send network events data: %d", eventCount)
}
return err
}
@@ -272,6 +281,33 @@ func (r *Runner) flushLogs(builder *analyzeBase.MetricsBuilder) (int, error) {
return count, nil
}
+func (r *Runner) flushEvents(builder *analyzeBase.MetricsBuilder) (int, error) {
+ events := builder.BuildEvents()
+ if len(events) == 0 {
+ return 0, nil
+ }
+
+ collector, err := r.eventClient.Collect(r.ctx)
+ if err != nil {
+ return 0, err
+ }
+ defer func() {
+ if _, e := collector.CloseAndRecv(); e != nil {
+ log.Warnf("close the event stream error: %v", e)
+ }
+ }()
+ count := 0
+ var sendErrors error
+ for _, m := range events {
+ if err := collector.Send(m); err != nil {
+ sendErrors = multierror.Append(fmt.Errorf("send error, traceid: %s, event: %s, reason: %v", m.TraceContext.TraceId, m.Event, err))
+ } else {
+ count++
+ }
+ }
+ return count, sendErrors
+}
+
func (r *Runner) Stop() error {
// if starting, then need to wait start finished
r.startLock.Lock()
@@ -301,6 +337,7 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error
connection := coreOperator.BackendOperator().GetConnection()
r.meterClient = v3.NewMeterReportServiceClient(connection)
r.logClient = logv3.NewLogReportServiceClient(connection)
+ r.eventClient = v3.NewSpanAttachedEventReportServiceClient(connection)
reportInterval, err := time.ParseDuration(config.Network.ReportInterval)
if err != nil {
diff --git a/pkg/tools/host/time.go b/pkg/tools/host/time.go
index 1d03bcf..b1ef253 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/tools/host/time.go
@@ -21,6 +21,8 @@ import (
"fmt"
"time"
+ v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
+
"github.com/shirou/gopsutil/host"
)
@@ -34,3 +36,12 @@ func init() {
}
BootTime = time.Unix(int64(boot), 0)
}
+
+func TimeToInstant(bpfTime uint64) *v3.Instant {
+ timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
+ result := timeCopy.Add(time.Duration(bpfTime))
+ return &v3.Instant{
+ Seconds: result.Unix(),
+ Nanos: int32(result.Nanosecond()),
+ }
+}
diff --git a/test/e2e/base/env b/test/e2e/base/env
index e414f00..dab2c61 100644
--- a/test/e2e/base/env
+++ b/test/e2e/base/env
@@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SW_CTL_COMMIT=521843f963917aa806740a9ad09c65aa59aca179
-SW_OAP_COMMIT=4adc05f89c5506541f6c0817db90238c96e1b5d3
+SW_CTL_COMMIT=651d196ee9345b164cc35fc1dbdcaf058920226a
+SW_OAP_COMMIT=a386853bc9ef6221c8d6d1688b607e1d230f5ec4
SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
SW_AGENT_GO_COMMIT=216f122d942cb683f48578d3014cc5ea83637582
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/expected/skywalking-trace.yml b/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
index ce30c3a..0ee677b 100644
--- a/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
+++ b/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
@@ -14,23 +14,38 @@
# limitations under the License.
spans:
- {{- contains .spans}}
- - traceid: {{ notEmpty .traceid }}
- segmentid: {{ notEmpty .segmentid }}
- spanid: 0
- parentspanid: -1
- refs: []
- servicecode: example
- serviceinstancename: {{ notEmpty .serviceinstancename }}
- starttime: {{ gt .starttime 0 }}
- endtime: {{ gt .endtime 0 }}
- endpointname: /provider
- type: Exit
- peer: https://proxy/provider
- component: Unknown
- iserror: false
- layer: Unknown
- tags: []
- logs: []
- attachedevents: []
- {{- end }}
\ No newline at end of file
+{{- contains .spans}}
+- traceid: {{ notEmpty .traceid }}
+ segmentid: {{ notEmpty .segmentid }}
+ spanid: 0
+ parentspanid: -1
+ refs: []
+ servicecode: example
+ serviceinstancename: {{ notEmpty .serviceinstancename }}
+ starttime: {{ gt .starttime 0 }}
+ endtime: {{ gt .endtime 0 }}
+ endpointname: /provider
+ type: Exit
+ peer: https://proxy/provider
+ component: Unknown
+ iserror: false
+ layer: Unknown
+ tags: []
+ logs: []
+ attachedevents:
+ {{- contains .attachedevents}}
+ - starttime:
+ seconds: {{ gt .starttime.seconds 0 }}
+ nanos: {{ ge .starttime.nanos 0 }}
+ event: HTTP Request Sampling
+ endtime:
+ seconds: {{ gt .endtime.seconds 0 }}
+ nanos: {{ ge .endtime.nanos 0 }}
+ summary: []
+ tags:
+ {{- contains .tags }}
+ - key: data size
+ value: {{ notEmpty .value }}
+ {{- end }}
+ {{- end }}
+{{- end }}
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml b/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
index 06ba743..6f0e7ca 100644
--- a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
+++ b/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
@@ -16,7 +16,7 @@
# HTTP1 verify
cases:
- query: |
- curl https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
+ curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
sleep 5;
swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
--name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
@@ -24,15 +24,17 @@ cases:
expected: expected/slow-traces.yml
# zipkin trace
- query: |
+ curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
+ sleep 3;
traceid=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
--name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
- --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
+ --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
curl http://${oap_host}:${oap_9412}/zipkin/api/v2/trace/${traceid} | yq e -| yq e 'del(.[].tags)' -
expected: expected/zipkin-trace.yml
# skywalking trace
- query: |
traceid=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
--name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
- --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE | yq e '. | map(select(.name == "skywalking-/provider")).[0].id' -);
+ --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "skywalking-/provider")).[0].id' -);
swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $traceid
expected: expected/skywalking-trace.yml
\ No newline at end of file