You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by li...@apache.org on 2022/11/17 12:04:07 UTC

[skywalking-rover] branch main updated: Support upload the full HTTP content to span attached event (#58)

This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 026b294  Support upload the full HTTP content to span attached event (#58)
026b294 is described below

commit 026b294653c85d73d806617b45f3f7f2221049ef
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Nov 17 20:04:01 2022 +0800

    Support upload the full HTTP content to span attached event (#58)
---
 bpf/profiling/network/netmonitor.c                 |  10 +-
 bpf/profiling/network/sock_stats.h                 |   3 +-
 docs/en/setup/configuration/profiling.md           |   8 +-
 go.mod                                             |  10 +-
 go.sum                                             |  20 +-
 pkg/profiling/task/network/analyze/base/metrics.go |  16 +-
 .../task/network/analyze/layer4/metrics.go         |   4 +-
 .../task/network/analyze/layer7/events.go          |   9 +-
 .../task/network/analyze/layer7/listener.go        |  10 +-
 .../analyze/layer7/protocols/{ => base}/events.go  |  26 +-
 .../analyze/layer7/protocols/base/protocol.go}     |  31 +-
 .../analyze/layer7/protocols/{ => base}/tracing.go |  57 +-
 .../task/network/analyze/layer7/protocols/http1.go | 598 ---------------------
 .../analyze/layer7/protocols/http1/analyzer.go     | 364 +++++++++++++
 .../{http1_test.go => http1/analyzer_test.go}      |  12 +-
 .../{http1_builder.go => http1/builder.go}         |  60 ++-
 .../analyze/layer7/protocols/http1/metrics.go      | 390 ++++++++++++++
 .../analyze/layer7/protocols/metrics/counter.go    |  10 +-
 .../network/analyze/layer7/protocols/protocols.go  |  60 +--
 pkg/profiling/task/network/runner.go               |  39 +-
 pkg/tools/host/time.go                             |  11 +
 test/e2e/base/env                                  |   4 +-
 .../network/expected/skywalking-trace.yml          |  55 +-
 .../profiling/network/http1-slow-traces-cases.yaml |   8 +-
 24 files changed, 1052 insertions(+), 763 deletions(-)

diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 3d82888..1a6f922 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -354,7 +354,7 @@ static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov,
     UPLOAD_PER_SOCKET_DATA_IOV();
 }
 
-static __inline void upload_socket_data(void *ctx, __u64 timestamp, __u64 conid, struct active_connection_t *connection, struct sock_data_args_t *args, ssize_t bytes_count, __u32 existing_msg_type, __u32 data_direction, bool ssl) {
+static __inline void upload_socket_data(void *ctx, __u64 start_time, __u64 end_time, __u64 conid, struct active_connection_t *connection, struct sock_data_args_t *args, ssize_t bytes_count, __u32 existing_msg_type, __u32 data_direction, bool ssl) {
     // generate event
     __u32 kZero = 0;
     struct socket_data_upload_event *event = bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero);
@@ -382,7 +382,8 @@ static __inline void upload_socket_data(void *ctx, __u64 timestamp, __u64 conid,
     }
 
     // basic data
-    event->timestamp = timestamp;
+    event->start_time = start_time;
+    event->end_time = end_time;
     event->protocol = connection->protocol;
     event->msg_type = existing_msg_type;
     event->direction = data_direction;
@@ -463,7 +464,7 @@ static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, st
     }
 
     // upload the socket data if need
-    upload_socket_data(ctx, curr_nacs, conid, conn, args, bytes_count, msg_type, data_direction, ssl);
+    upload_socket_data(ctx, args->start_nacs, curr_nacs, conid, conn, args, bytes_count, msg_type, data_direction, ssl);
 
     // add statics when is not ssl(native buffer)
     if (ssl == false) {
@@ -971,9 +972,12 @@ SEC("kprobe/recv")
 int sys_recv(struct pt_regs* ctx) {
     ctx = (struct pt_regs *)PT_REGS_PARM1(ctx);
     __u64 id = bpf_get_current_pid_tgid();
+    char* buf;
+    bpf_probe_read(&buf, sizeof(buf), &(PT_REGS_PARM2(ctx)));
 
     struct sock_data_args_t data_args = {};
     data_args.fd = _(PT_REGS_PARM1(ctx));
+    data_args.buf = buf;
     data_args.start_nacs = bpf_ktime_get_ns();
     data_args.data_id = generate_socket_data_id(id, data_args.fd, SOCKET_OPTS_TYPE_RECV);
     bpf_map_update_elem(&socket_data_args, &id, &data_args, 0);
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index f31e859..8569036 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -263,7 +263,8 @@ struct socket_data_upload_event {
     __u8 finished;
     __u16 sequence;
     __u16 data_len;
-    __u64 timestamp;
+    __u64 start_time;
+    __u64 end_time;
     __u64 conid;
     __u64 randomid;
     __u64 data_id;
diff --git a/docs/en/setup/configuration/profiling.md b/docs/en/setup/configuration/profiling.md
index 5811306..7f2c35d 100644
--- a/docs/en/setup/configuration/profiling.md
+++ b/docs/en/setup/configuration/profiling.md
@@ -106,4 +106,10 @@ Based on the above two data types, the following metrics are provided.
 
 | Name         | Type  | Unit        | Description                |
 |--------------|-------|-------------|----------------------------|
-| slow_traces  | TopN  | millisecond | The Top N slow trace(id)s  |
\ No newline at end of file
+| slow_traces  | TopN  | millisecond | The Top N slow trace(id)s  |
+
+##### Span Attached Event
+| Name               | Description                                                                                   |
+|--------------------|-----------------------------------------------------------------------------------------------|
+| http-full-request  | Complete information about the HTTP request, it's only reported when it matches slow traces.  |
+| http-full-response | Complete information about the HTTP response, it's only reported when it matches slow traces. |
diff --git a/go.mod b/go.mod
index 0e8c2de..82974a6 100644
--- a/go.mod
+++ b/go.mod
@@ -16,13 +16,14 @@ require (
 	github.com/spf13/viper v1.10.1
 	github.com/zekroTJA/timedmap v1.4.0
 	golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
-	golang.org/x/sys v0.0.0-20211210111614-af8b64212486
+	golang.org/x/net v0.0.0-20220722155237-a158d28d115b
+	golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab
 	google.golang.org/grpc v1.44.0
 	k8s.io/api v0.23.5
 	k8s.io/apimachinery v0.23.5
 	k8s.io/client-go v0.23.5
 	k8s.io/utils v0.0.0-20211116205334-6203023598ed
-	skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce
+	skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187
 )
 
 require (
@@ -52,14 +53,13 @@ require (
 	github.com/tklauser/go-sysconf v0.3.9 // indirect
 	github.com/tklauser/numcpus v0.3.0 // indirect
 	github.com/yusufpapurcu/wmi v1.2.2 // indirect
-	golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
 	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
-	golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect
+	golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
 	golang.org/x/text v0.3.7 // indirect
 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
-	google.golang.org/protobuf v1.28.0 // indirect
+	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/inf.v0 v0.9.1 // indirect
 	gopkg.in/ini.v1 v1.66.2 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index 11cb4b5..5d1bff8 100644
--- a/go.sum
+++ b/go.sum
@@ -550,9 +550,9 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
 golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY=
 golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
+golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -654,14 +654,16 @@ golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211210111614-af8b64212486 h1:5hpz5aRr+W1erYCL5JRhSUBJRph7l9XkNveoExlrKYk=
 golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab h1:2QkjZIsXupsJbJIdSjjUOgWK3aEtzyuh2mPt3l/CkeU=
+golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
-golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE=
 golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
+golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -887,8 +889,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
-google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
-google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
+google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
+google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -950,5 +952,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
 sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce h1:3wCiFWEEREdxe0T/PJM1S6DDqDsKETYVNtfaJ4zWmdQ=
-skywalking.apache.org/repo/goapi v0.0.0-20220518063910-af3e2df60bce/go.mod h1:uWwwvhcwe2MD/nJCg0c1EE/eL6KzaBosLHDfMFoEJ30=
+skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187 h1:6JgAg9aohcHd72VplZUGycZgCNo6iQrz735nmtOTCnE=
+skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187/go.mod h1:lxmYWY1uAP5SLVKNymAyDzn7KG6dhPWN+pYHmyt+0vo=
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go b/pkg/profiling/task/network/analyze/base/metrics.go
index 15a1438..21acb29 100644
--- a/pkg/profiling/task/network/analyze/base/metrics.go
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -31,7 +31,7 @@ import (
 // ConnectionMetrics The Metrics in each listener
 type ConnectionMetrics interface {
 	// MergeMetricsFromConnection merge the metrics from connection, and added into self
-	MergeMetricsFromConnection(connection *ConnectionContext)
+	MergeMetricsFromConnection(connection *ConnectionContext, data ConnectionMetrics)
 }
 
 type ConnectionMetricsContext struct {
@@ -51,8 +51,9 @@ func (c *ConnectionMetricsContext) GetMetrics(listenerName string) ConnectionMet
 }
 
 func (c *ConnectionMetricsContext) MergeMetricsFromConnection(connection *ConnectionContext) {
-	for _, metric := range c.data {
-		metric.MergeMetricsFromConnection(connection)
+	for name, metric := range c.data {
+		metrics := connection.Metrics.GetMetrics(name)
+		metric.MergeMetricsFromConnection(connection, metrics)
 	}
 }
 
@@ -60,6 +61,7 @@ type MetricsBuilder struct {
 	prefix  string
 	metrics map[metadata][]*agentv3.MeterData
 	logs    map[metadata][]*logv3.LogData
+	events  []*agentv3.SpanAttachedEvent
 }
 
 func NewMetricsBuilder(prefix string) *MetricsBuilder {
@@ -85,6 +87,10 @@ func (m *MetricsBuilder) AppendLogs(service string, log *logv3.LogData) {
 	m.logs[meta] = append(m.logs[meta], log)
 }
 
+func (m *MetricsBuilder) AppendSpanAttachedEvents(events []*agentv3.SpanAttachedEvent) {
+	m.events = append(m.events, events...)
+}
+
 func (m *MetricsBuilder) MetricPrefix() string {
 	return m.prefix
 }
@@ -141,6 +147,10 @@ func (m *MetricsBuilder) BuildLogs() [][]*logv3.LogData {
 	return result
 }
 
+func (m *MetricsBuilder) BuildEvents() []*agentv3.SpanAttachedEvent {
+	return m.events
+}
+
 type metadata struct {
 	Layer        string
 	ServiceName  string
diff --git a/pkg/profiling/task/network/analyze/layer4/metrics.go b/pkg/profiling/task/network/analyze/layer4/metrics.go
index b8bb89a..2a33aec 100644
--- a/pkg/profiling/task/network/analyze/layer4/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer4/metrics.go
@@ -64,8 +64,8 @@ func NewLayer4Metrics() *Metrics {
 	}
 }
 
-func (l *Metrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
-	metrics := connection.Metrics.GetMetrics(Name).(*Metrics)
+func (l *Metrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+	metrics := data.(*Metrics)
 
 	l.WriteCounter.IncreaseToCurrent(metrics.WriteCounter.CalculateIncrease())
 	l.ReadCounter.IncreaseToCurrent(metrics.ReadCounter.CalculateIncrease())
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index a15b734..1b03572 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -21,6 +21,7 @@ import (
 	"context"
 
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
 )
 
@@ -33,9 +34,9 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int) {
 func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
 	l.socketDataQueue.Start(ctx, bpfLoader, bpfLoader.SocketDataUploadEventQueue, 1, l.protocolPerCPUBuffer,
 		func() interface{} {
-			return &protocols.SocketDataUploadEvent{}
+			return &base.SocketDataUploadEvent{}
 		}, func(data interface{}) string {
-			return data.(*protocols.SocketDataUploadEvent).GenerateConnectionID()
+			return data.(*base.SocketDataUploadEvent).GenerateConnectionID()
 		})
 }
 
@@ -43,13 +44,13 @@ type SocketDataPartitionContext struct {
 	analyzer *protocols.Analyzer
 }
 
-func NewSocketDataPartitionContext(l protocols.Context) *SocketDataPartitionContext {
+func NewSocketDataPartitionContext(l base.Context) *SocketDataPartitionContext {
 	return &SocketDataPartitionContext{
 		analyzer: protocols.NewAnalyzer(l),
 	}
 }
 
 func (p *SocketDataPartitionContext) Consume(data interface{}) {
-	event := data.(*protocols.SocketDataUploadEvent)
+	event := data.(*base.SocketDataUploadEvent)
 	p.analyzer.ReceiveSocketDataEvent(event)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index facf4a5..aa05f35 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -29,11 +29,14 @@ import (
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
 
 	"github.com/zekroTJA/timedmap"
 )
 
+var ListenerName = "layer7"
+
 const (
 	ConnectionCachedTTL      = time.Minute
 	ConnectionCacheFlushTime = time.Second * 20
@@ -56,7 +59,7 @@ func NewListener(analyzer *base.AnalyzerContext) *Listener {
 }
 
 func (l *Listener) Name() string {
-	return protocols.ListenerName
+	return ListenerName
 }
 
 func (l *Listener) Init(config *profiling.TaskConfig, moduleManager *module.Manager) error {
@@ -125,6 +128,11 @@ func (l *Listener) QueryConnection(conID, randomID uint64) *base.ConnectionConte
 	return nil
 }
 
+func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) protocol.Metrics {
+	metrics := conMetrics.GetMetrics(ListenerName).(*protocols.ProtocolMetrics)
+	return metrics.GetProtocolMetrics(protocolName)
+}
+
 func (l *Listener) generateCachedConnectionKey(conID, randomID uint64) string {
 	return fmt.Sprintf("%d_%d", conID, randomID)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
similarity index 90%
rename from pkg/profiling/task/network/analyze/layer7/protocols/events.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index de0b534..e3f09f0 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package protocols
+package base
 
 import (
 	"fmt"
@@ -30,12 +30,13 @@ type SocketDataBuffer interface {
 	BufferData() []byte
 	// TotalSize of socket data, the data may exceed the size of the BufferData()
 	TotalSize() uint64
-	// Time of socket data send
-	Time() uint64
 	// Direction of the data, send or receive
 	Direction() base.SocketDataDirection
 	FirstEvent() *SocketDataUploadEvent
 
+	StartTime() uint64
+	EndTime() uint64
+
 	MinDataID() int
 	MaxDataID() int
 }
@@ -47,7 +48,8 @@ type SocketDataUploadEvent struct {
 	Finished     uint8
 	Sequence     uint16
 	DataLen      uint16
-	Timestamp    uint64
+	StartTime0   uint64
+	EndTime0     uint64
 	ConnectionID uint64
 	RandomID     uint64
 	DataID       uint64
@@ -63,8 +65,12 @@ func (s *SocketDataUploadEvent) BufferData() []byte {
 	return s.Buffer[:s.DataLen]
 }
 
-func (s *SocketDataUploadEvent) Time() uint64 {
-	return s.Timestamp
+func (s *SocketDataUploadEvent) StartTime() uint64 {
+	return s.StartTime0
+}
+
+func (s *SocketDataUploadEvent) EndTime() uint64 {
+	return s.EndTime0
 }
 
 func (s *SocketDataUploadEvent) Direction() base.SocketDataDirection {
@@ -124,8 +130,12 @@ func (s *SocketDataUploadCombinedEvent) TotalSize() uint64 {
 	return s.first.TotalSize0
 }
 
-func (s *SocketDataUploadCombinedEvent) Time() uint64 {
-	return s.first.Timestamp
+func (s *SocketDataUploadCombinedEvent) StartTime() uint64 {
+	return s.first.StartTime0
+}
+
+func (s *SocketDataUploadCombinedEvent) EndTime() uint64 {
+	return s.first.EndTime0
 }
 
 func (s *SocketDataUploadCombinedEvent) MinDataID() int {
diff --git a/pkg/tools/host/time.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
similarity index 56%
copy from pkg/tools/host/time.go
copy to pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 1d03bcf..53494d1 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -15,22 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package host
+package base
 
-import (
-	"fmt"
-	"time"
+import "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 
-	"github.com/shirou/gopsutil/host"
-)
+type Protocol interface {
+	Name() string
+	GenerateMetrics() Metrics
 
-// BootTime the System boot time
-var BootTime time.Time
+	ReceiveData(context Context, event *SocketDataUploadEvent) bool
+}
+
+type Context interface {
+	QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
+	QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) Metrics
+}
+
+type Metrics interface {
+	base.ConnectionMetrics
 
-func init() {
-	boot, err := host.BootTime()
-	if err != nil {
-		panic(fmt.Errorf("init boot time error: %v", err))
-	}
-	BootTime = time.Unix(int64(boot), 0)
+	// FlushMetrics flush all metrics from traffic to the metricsBuilder
+	FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/tracing.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
similarity index 72%
rename from pkg/profiling/task/network/analyze/layer7/protocols/tracing.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
index 2027674..2ef3188 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/tracing.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/tracing.go
@@ -15,23 +15,32 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package protocols
+package base
 
 import (
 	"encoding/base64"
 	"fmt"
 	"strings"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
 )
 
 type TracingContext interface {
 	TraceID() string
-	Provider() string
+	TraceSegmentID() string
+	SpanID() string
+	Provider() *TraceContextProvider
+}
+
+type TraceContextProvider struct {
+	Type v3.SpanAttachedEvent_SpanReferenceType
+	Name string
 }
 
 type SkyWalkingTracingContext struct {
 	TraceID0              string
-	SegmentID             string
-	SpanID                string
+	SegmentID0            string
+	SpanID0               string
 	ParentService         string
 	ParentServiceInstance string
 	ParentEndpoint        string
@@ -40,15 +49,26 @@ type SkyWalkingTracingContext struct {
 
 type ZipkinTracingContext struct {
 	TraceID0 string
-	SpanID   string
+	SpanID0  string
 }
 
 func (w *SkyWalkingTracingContext) TraceID() string {
 	return w.TraceID0
 }
 
-func (w *SkyWalkingTracingContext) Provider() string {
-	return "skywalking"
+func (w *SkyWalkingTracingContext) TraceSegmentID() string {
+	return w.SegmentID0
+}
+
+func (w *SkyWalkingTracingContext) SpanID() string {
+	return w.SpanID0
+}
+
+func (w *SkyWalkingTracingContext) Provider() *TraceContextProvider {
+	return &TraceContextProvider{
+		Type: v3.SpanAttachedEvent_SKYWALKING,
+		Name: "skywalking",
+	}
 }
 
 func AnalyzeTracingContext(fetcher func(key string) string) (TracingContext, error) {
@@ -79,8 +99,8 @@ func analyzeSkyWalking8TracingContext(val string) (*SkyWalkingTracingContext, er
 	var err error
 	ctx := &SkyWalkingTracingContext{}
 	ctx.TraceID0, err = decodeBase64StringValue(err, parts[1])
-	ctx.SegmentID, err = decodeBase64StringValue(err, parts[2])
-	ctx.SpanID = parts[3]
+	ctx.SegmentID0, err = decodeBase64StringValue(err, parts[2])
+	ctx.SpanID0 = parts[3]
 	ctx.ParentService, err = decodeBase64StringValue(err, parts[4])
 	ctx.ParentServiceInstance, err = decodeBase64StringValue(err, parts[5])
 	ctx.ParentEndpoint, err = decodeBase64StringValue(err, parts[6])
@@ -93,7 +113,7 @@ func analyzeSkyWalking8TracingContext(val string) (*SkyWalkingTracingContext, er
 }
 
 func analyzeZipkinTracingContextWithSpecificData(traceID, spanID string) *ZipkinTracingContext {
-	return &ZipkinTracingContext{TraceID0: traceID, SpanID: spanID}
+	return &ZipkinTracingContext{TraceID0: traceID, SpanID0: spanID}
 }
 
 func analyzeZipkinTracingContextWithSingleData(singleData string) *ZipkinTracingContext {
@@ -101,15 +121,26 @@ func analyzeZipkinTracingContextWithSingleData(singleData string) *ZipkinTracing
 	if len(info) < 2 {
 		return nil
 	}
-	return &ZipkinTracingContext{TraceID0: info[0], SpanID: info[1]}
+	return &ZipkinTracingContext{TraceID0: info[0], SpanID0: info[1]}
 }
 
 func (w *ZipkinTracingContext) TraceID() string {
 	return w.TraceID0
 }
 
-func (w *ZipkinTracingContext) Provider() string {
-	return "zipkin"
+func (w *ZipkinTracingContext) TraceSegmentID() string {
+	return ""
+}
+
+func (w *ZipkinTracingContext) SpanID() string {
+	return w.SpanID0
+}
+
+func (w *ZipkinTracingContext) Provider() *TraceContextProvider {
+	return &TraceContextProvider{
+		Type: v3.SpanAttachedEvent_ZIPKIN,
+		Name: "zipkin",
+	}
 }
 
 func decodeBase64StringValue(err error, val string) (string, error) {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1.go
deleted file mode 100644
index 2c88cbc..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1.go
+++ /dev/null
@@ -1,598 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package protocols
-
-import (
-	"bufio"
-	"bytes"
-	"container/list"
-	"encoding/json"
-	"fmt"
-	"io"
-	"net/http"
-	"net/textproto"
-	"strconv"
-	"strings"
-	"sync"
-	"time"
-
-	commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
-	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
-	logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
-
-	"github.com/sirupsen/logrus"
-
-	"github.com/apache/skywalking-rover/pkg/process/api"
-	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
-	"github.com/apache/skywalking-rover/pkg/tools"
-)
-
-var HTTP1ProtocolName = "http1"
-
-var HTTP1PackageSizeHistogramBuckets = []float64{
-	// 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
-	256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
-	// 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M
-	819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800,
-}
-
-var HTTP1DurationHistogramBuckets = []float64{
-	// unit ms
-	1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290,
-	330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
-}
-
-var SlowTraceTopNSize = 10
-
-type HTTP1Analyzer struct {
-	// cache connection metrics if the connect event not receive or process
-	cache map[string]*HTTP1ConnectionMetrics
-}
-
-type HTTP1ConnectionMetrics struct {
-	// halfData all data event(request/response) not finished
-	halfData *list.List
-
-	clientMetrics *HTTP1URIMetrics
-	serverMetrics *HTTP1URIMetrics
-	metricsLocker sync.RWMutex
-}
-
-type HTTP1URIMetrics struct {
-	RequestCounter *metrics.Counter
-	StatusCounter  map[int]*metrics.Counter
-
-	AvgRequestPackageSize    *metrics.AvgCounter
-	AvgResponsePackageSize   *metrics.AvgCounter
-	ReqPackageSizeHistogram  *metrics.Histogram
-	RespPackageSizeHistogram *metrics.Histogram
-
-	avgDuration       *metrics.AvgCounter
-	durationHistogram *metrics.Histogram
-
-	slowTraces *metrics.TopN
-}
-
-func NewHTTP1URIMetrics() *HTTP1URIMetrics {
-	return &HTTP1URIMetrics{
-		RequestCounter:           metrics.NewCounter(),
-		StatusCounter:            make(map[int]*metrics.Counter),
-		AvgRequestPackageSize:    metrics.NewAvgCounter(),
-		AvgResponsePackageSize:   metrics.NewAvgCounter(),
-		ReqPackageSizeHistogram:  metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
-		RespPackageSizeHistogram: metrics.NewHistogram(HTTP1PackageSizeHistogramBuckets),
-		avgDuration:              metrics.NewAvgCounter(),
-		durationHistogram:        metrics.NewHistogram(HTTP1DurationHistogramBuckets),
-		slowTraces:               metrics.NewTopN(SlowTraceTopNSize),
-	}
-}
-
-func NewHTTP1Analyzer() Protocol {
-	return &HTTP1Analyzer{
-		cache: make(map[string]*HTTP1ConnectionMetrics),
-	}
-}
-
-func (h *HTTP1Analyzer) Name() string {
-	return HTTP1ProtocolName
-}
-
-func (h *HTTP1Analyzer) GenerateMetrics() Metrics {
-	return &HTTP1ConnectionMetrics{
-		halfData: list.New(),
-
-		clientMetrics: NewHTTP1URIMetrics(),
-		serverMetrics: NewHTTP1URIMetrics(),
-	}
-}
-
-func (h *HTTP1Analyzer) ReceiveData(context Context, event *SocketDataUploadEvent) bool {
-	// only handle the HTTP1 protocol
-	if event.Protocol != base.ConnectionProtocolHTTP {
-		return false
-	}
-
-	connectionID := event.GenerateConnectionID()
-	fromAnalyzerCache := false
-	var connectionMetrics *HTTP1ConnectionMetrics
-	connection := context.QueryConnection(event.ConnectionID, event.RandomID)
-	// if connection not exists, then cached it into the analyzer context
-	if connection == nil {
-		connectionMetrics = h.cache[connectionID]
-		fromAnalyzerCache = true
-		if connectionMetrics == nil {
-			connectionMetrics = h.GenerateMetrics().(*HTTP1ConnectionMetrics)
-			h.cache[connectionID] = connectionMetrics
-		}
-	} else {
-		connectionMetrics = QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
-	}
-
-	log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
-		connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
-	// if the cache is existing in the analyzer context, then delete it
-	if !fromAnalyzerCache {
-		if tmp := h.cache[connectionID]; tmp != nil {
-			connectionMetrics.MergeFrom(h, tmp)
-			delete(h.cache, connectionID)
-		}
-	}
-
-	req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
-	if req != nil && resp != nil {
-		if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
-			log.Errorf("HTTP1 analyze failure: %v", err)
-			return false
-		}
-	} else {
-		log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
-	}
-	return true
-}
-
-func (h *HTTP1Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
-	lastAppender SocketDataBuffer) SocketDataBuffer {
-	firstEvent := firstElement.Value.(*SocketDataUploadEvent)
-	if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
-		halfConnections.Remove(firstElement)
-		return h.combineEventIfNeed(firstEvent, lastAppender)
-	}
-	next := firstElement.Next()
-	halfConnections.Remove(firstElement)
-	var buffer SocketDataBuffer = firstEvent
-	// for-each the events until buffer finished
-	for next != nil {
-		event := next.Value.(*SocketDataUploadEvent)
-
-		buffer = buffer.Combine(event)
-
-		tmp := next.Next()
-		halfConnections.Remove(next)
-		next = tmp
-		// combine event
-		if event.Finished == 1 {
-			return h.combineEventIfNeed(buffer, lastAppender)
-		}
-	}
-	return h.combineEventIfNeed(buffer, lastAppender)
-}
-
-func (h *HTTP1Analyzer) combineEventIfNeed(data, appender SocketDataBuffer) SocketDataBuffer {
-	if appender != nil {
-		return data.Combine(appender)
-	}
-	return data
-}
-
-func (h *HTTP1Analyzer) buildHTTP1(halfConnections *list.List, event *SocketDataUploadEvent) (request, response SocketDataBuffer) {
-	// no connections, then just add the response to the half connections to wait the request
-	if halfConnections.Len() == 0 {
-		halfConnections.PushBack(event)
-		return nil, nil
-	}
-
-	// quick handler(only one element, and is request)
-	if halfConnections.Len() == 1 {
-		firstElement := halfConnections.Front()
-		firstEvent := firstElement.Value.(*SocketDataUploadEvent)
-		if firstEvent.IsStart() && firstEvent.IsFinished() && event.IsStart() && event.IsFinished() &&
-			firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
-			event.MsgType == base.SocketMessageTypeResponse {
-			return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
-		}
-	}
-
-	// push to the queue
-	h.insertToList(halfConnections, event)
-
-	// trying to find completed request and response
-	return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
-}
-
-func (h *HTTP1Analyzer) insertToList(halfConnections *list.List, event *SocketDataUploadEvent) {
-	if halfConnections.Len() == 0 {
-		halfConnections.PushFront(event)
-		return
-	}
-	if halfConnections.Back().Value.(*SocketDataUploadEvent).DataID < event.DataID {
-		halfConnections.PushBack(event)
-		return
-	}
-	beenAdded := false
-	for element := halfConnections.Front(); element != nil; element = element.Next() {
-		existEvent := element.Value.(*SocketDataUploadEvent)
-		if existEvent.DataID > event.DataID {
-			// data id needs order
-			beenAdded = true
-		} else if existEvent.DataID == event.DataID {
-			if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
-				// same message type and following the sequence order
-				beenAdded = true
-			} else if existEvent.MsgType > event.MsgType {
-				// request needs before response
-				beenAdded = true
-			}
-		}
-		if beenAdded {
-			halfConnections.InsertBefore(event, element)
-			break
-		}
-	}
-	if !beenAdded {
-		halfConnections.PushBack(event)
-	}
-}
-
-func (h *HTTP1Analyzer) analyze(_ Context, connectionID string, connectionMetrics *HTTP1ConnectionMetrics,
-	requestBuffer, responseBuffer SocketDataBuffer) error {
-	request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
-	if err != nil {
-		return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
-			len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
-	}
-
-	response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
-	if response != nil {
-		defer response.Body.Close()
-	}
-	if err != nil {
-		if err == io.ErrUnexpectedEOF || err == io.EOF {
-			response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
-			if err != nil {
-				return fmt.Errorf("parsing simple data error: %v", err)
-			}
-			if response != nil && response.Body != nil {
-				defer response.Body.Close()
-			}
-		}
-		if err != nil {
-			return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
-				len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
-		}
-	}
-
-	// lock append metrics with read locker
-	connectionMetrics.metricsLocker.RLock()
-	defer connectionMetrics.metricsLocker.RUnlock()
-
-	// append metrics
-	data := connectionMetrics.clientMetrics
-	side := base.ConnectionRoleClient
-	if requestBuffer.Direction() == base.SocketDataDirectionIngress {
-		// if receive the request, that's mean is server side
-		data = connectionMetrics.serverMetrics
-		side = base.ConnectionRoleServer
-	}
-	h.appendToMetrics(data, request, requestBuffer, response, responseBuffer)
-
-	if log.Enable(logrus.DebugLevel) {
-		metricsJSON, _ := json.Marshal(data)
-		log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
-	}
-	return nil
-}
-
-func (h *HTTP1Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
-	if reader.Size() < 16 {
-		return nil, fmt.Errorf("the header length not enough")
-	}
-	tp := textproto.NewReader(reader)
-	resp := &http.Response{
-		Request: request,
-	}
-
-	line, err := tp.ReadLine()
-	if err != nil {
-		return nil, fmt.Errorf("read response first line failure: %v", err)
-	}
-	indexByte := strings.IndexByte(line, ' ')
-	if indexByte == -1 {
-		return nil, fmt.Errorf("parsing response error: %s", line)
-	}
-	resp.Proto = line[:indexByte]
-	resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
-	statusCode := resp.Status
-	if i := strings.IndexByte(resp.Status, ' '); i != -1 {
-		statusCode = resp.Status[:i]
-	}
-	if len(statusCode) != 3 {
-		return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
-	}
-	resp.StatusCode, err = strconv.Atoi(statusCode)
-	if err != nil || resp.StatusCode < 0 {
-		return nil, fmt.Errorf("status code not correct: %s", statusCode)
-	}
-	var ok bool
-	if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
-		return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
-	}
-
-	return resp, nil
-}
-
-func (h *HTTP1Analyzer) appendToMetrics(data *HTTP1URIMetrics, req *http.Request, reqBuffer SocketDataBuffer,
-	resp *http.Response, respBuffer SocketDataBuffer) {
-	data.RequestCounter.Increase()
-	statusCounter := data.StatusCounter[resp.StatusCode]
-	if statusCounter == nil {
-		statusCounter = metrics.NewCounter()
-		data.StatusCounter[resp.StatusCode] = statusCounter
-	}
-	statusCounter.Increase()
-
-	data.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
-	data.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
-	data.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
-	data.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
-
-	duration := time.Duration(respBuffer.Time() - reqBuffer.Time())
-	durationInMS := float64(duration.Milliseconds())
-	data.avgDuration.Increase(durationInMS)
-	data.durationHistogram.Increase(durationInMS)
-
-	h.increaseSlowTraceTopN(data.slowTraces, duration, req, resp, reqBuffer, respBuffer)
-}
-
-func (h *HTTP1Analyzer) increaseSlowTraceTopN(slowTraceTopN *metrics.TopN, duration time.Duration,
-	request *http.Request, _ *http.Response, reqBuffer, respBuffer SocketDataBuffer) {
-	tracingContext, err := AnalyzeTracingContext(func(key string) string {
-		return request.Header.Get(key)
-	})
-	if err != nil {
-		log.Warnf("analyze tracing context error: %v", err)
-		return
-	}
-	if tracingContext == nil {
-		return
-	}
-
-	// remove the query parameters
-	uri := request.RequestURI
-	if i := strings.Index(uri, "?"); i > 0 {
-		uri = uri[0:i]
-	}
-	trace := &HTTP1Trace{Trace: tracingContext, RequestURI: uri, RequestBuffer: reqBuffer, ResponseBuffer: respBuffer}
-	slowTraceTopN.AddRecord(trace, duration.Milliseconds())
-}
-
-func (h *HTTP1ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
-	other := QueryProtocolMetrics(connection.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
-	other.metricsLocker.Lock()
-	defer other.metricsLocker.Unlock()
-
-	h.clientMetrics.MergeAndClean(other.clientMetrics)
-	h.serverMetrics.MergeAndClean(other.serverMetrics)
-	if log.Enable(logrus.DebugLevel) {
-		clientMetrics, _ := json.Marshal(h.clientMetrics)
-		serverMetrics, _ := json.Marshal(h.serverMetrics)
-		log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
-			connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
-	}
-}
-
-func (h *HTTP1ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
-	connectionMetrics := QueryProtocolMetrics(traffic.Metrics, HTTP1ProtocolName).(*HTTP1ConnectionMetrics)
-	for _, p := range traffic.LocalProcesses {
-		// if the remote process is profiling, then used the client side
-		localMetrics := connectionMetrics.clientMetrics
-		remoteMetrics := connectionMetrics.serverMetrics
-		if traffic.Role == base.ConnectionRoleServer {
-			localMetrics = connectionMetrics.serverMetrics
-			remoteMetrics = connectionMetrics.clientMetrics
-		}
-
-		metricsCount := h.appendMetrics(traffic, p, "", localMetrics, metricsBuilder, false)
-		if traffic.RemoteProcessIsProfiling() {
-			metricsCount += h.appendMetrics(traffic, p, "", remoteMetrics, metricsBuilder, true)
-		}
-		if metricsCount <= 0 {
-			continue
-		}
-
-		if log.Enable(logrus.DebugLevel) {
-			// if remote process is profiling, then the metrics data need to be cut half
-			log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)"+
-				traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
-				connectionMetrics.clientMetrics.String(), connectionMetrics.serverMetrics.String())
-		}
-	}
-}
-
-func (h *HTTP1ConnectionMetrics) appendMetrics(traffic *base.ProcessTraffic,
-	local api.ProcessInterface, url string, http1Metrics *HTTP1URIMetrics, metricsBuilder *base.MetricsBuilder, durationOnly bool) int {
-	collections := make([]*v3.MeterData, 0)
-	role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
-	prefix := metricsBuilder.MetricPrefix()
-
-	collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
-		traffic, http1Metrics.avgDuration)
-	collections = h.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
-		traffic, http1Metrics.durationHistogram)
-	if durationOnly {
-		return len(collections)
-	}
-
-	collections = h.buildMetrics(collections, prefix, "request_counter", labels, url, traffic, http1Metrics.RequestCounter)
-	for status, counter := range http1Metrics.StatusCounter {
-		statusLabels := append(labels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
-		collections = h.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic, counter)
-	}
-
-	collections = h.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, http1Metrics.AvgRequestPackageSize)
-	collections = h.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, http1Metrics.AvgResponsePackageSize)
-	collections = h.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic, http1Metrics.ReqPackageSizeHistogram)
-	collections = h.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, http1Metrics.RespPackageSizeHistogram)
-
-	metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
-	logsCount := http1Metrics.slowTraces.AppendData(local, traffic, metricsBuilder)
-	return len(collections) + logsCount
-}
-
-func (h *HTTP1ConnectionMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
-	url string, _ *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
-	// if remote process is also profiling, then needs to be calculated half of metrics
-	labels := basicLabels
-	var meterName string
-	if url != "" {
-		labels = append(labels, &v3.Label{Name: "url", Value: url})
-		meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
-	} else {
-		meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
-	}
-	return data.AppendMeter(collection, meterName, labels)
-}
-
-func (h *HTTP1ConnectionMetrics) MergeFrom(analyzer *HTTP1Analyzer, other *HTTP1ConnectionMetrics) {
-	if other.halfData != nil {
-		for element := other.halfData.Front(); element != nil; element = element.Next() {
-			analyzer.insertToList(h.halfData, element.Value.(*SocketDataUploadEvent))
-		}
-	}
-}
-
-func (u *HTTP1URIMetrics) MergeAndClean(other *HTTP1URIMetrics) {
-	u.RequestCounter.MergeAndClean(other.RequestCounter)
-	for k, v := range other.StatusCounter {
-		if existing := u.StatusCounter[k]; existing != nil {
-			existing.MergeAndClean(v)
-		} else {
-			u.StatusCounter[k] = v
-		}
-	}
-
-	u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
-	u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
-	u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
-	u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
-	u.avgDuration.MergeAndClean(other.avgDuration)
-	u.durationHistogram.MergeAndClean(other.durationHistogram)
-	u.slowTraces.MergeAndClean(other.slowTraces)
-}
-
-func (u *HTTP1URIMetrics) String() string {
-	return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, slow trace count: %d",
-		u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
-		u.avgDuration.Calculate(), u.slowTraces.List.Len())
-}
-
-type HTTP1Trace struct {
-	Trace          TracingContext
-	RequestURI     string
-	RequestBuffer  SocketDataBuffer
-	ResponseBuffer SocketDataBuffer
-}
-
-func (h *HTTP1Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
-	logData := &logv3.LogData{}
-	logData.Service = process.Entity().ServiceName
-	logData.ServiceInstance = process.Entity().InstanceName
-	logData.Layer = process.Entity().Layer
-
-	logData.Tags = &logv3.LogTags{Data: make([]*commonv3.KeyStringValuePair, 0)}
-	logData.Tags.Data = append(logData.Tags.Data, &commonv3.KeyStringValuePair{Key: "LOG_KIND", Value: "NET_PROFILING_SAMPLED_TRACE"})
-
-	// trace context
-	traceContext := &logv3.TraceContext{}
-	traceContext.TraceId = h.Trace.TraceID()
-	logData.TraceContext = traceContext
-
-	// body
-	logBody := &logv3.LogDataBody{Type: "json"}
-	body := &HTTP1SlowTraceLogBody{
-		Latency:       duration,
-		TraceProvider: h.Trace.Provider(),
-		DetectPoint:   traffic.Role.String(),
-		Component:     traffic.Protocol.String(),
-		SSL:           traffic.IsSSL,
-		URI:           h.RequestURI,
-		Reason:        "slow",
-	}
-	if traffic.Role == base.ConnectionRoleClient {
-		body.ClientProcess = &HTTP1SlowTraceLogProcess{ProcessID: process.ID()}
-		body.ServerProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
-	} else {
-		body.ServerProcess = &HTTP1SlowTraceLogProcess{ProcessID: process.ID()}
-		body.ClientProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
-	}
-	bodyJSON, err := json.Marshal(body)
-	if err != nil {
-		log.Warnf("format the slow trace log body failure: %v", err)
-		return
-	}
-	logBody.Content = &logv3.LogDataBody_Json{Json: &logv3.JSONLog{Json: string(bodyJSON)}}
-	logData.Body = logBody
-
-	metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
-}
-
-type HTTP1SlowTraceLogBody struct {
-	URI           string                    `json:"uri"`
-	Reason        string                    `json:"reason"`
-	Latency       int64                     `json:"latency"`
-	TraceProvider string                    `json:"trace_provider"`
-	ClientProcess *HTTP1SlowTraceLogProcess `json:"client_process"`
-	ServerProcess *HTTP1SlowTraceLogProcess `json:"server_process"`
-	DetectPoint   string                    `json:"detect_point"`
-	Component     string                    `json:"component"`
-	SSL           bool                      `json:"ssl"`
-}
-
-type HTTP1SlowTraceLogProcess struct {
-	ProcessID string `json:"process_id"`
-	Local     bool   `json:"local"`
-	Address   string `json:"address"`
-}
-
-func NewHTTP1SlowTRaceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *HTTP1SlowTraceLogProcess {
-	if len(traffic.RemoteProcesses) != 0 {
-		for _, p := range traffic.RemoteProcesses {
-			// only match with same service instance
-			if local.Entity().ServiceName == p.Entity().ServiceName &&
-				local.Entity().InstanceName == p.Entity().InstanceName {
-				return &HTTP1SlowTraceLogProcess{ProcessID: p.ID()}
-			}
-		}
-	}
-
-	if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
-		return &HTTP1SlowTraceLogProcess{Local: true}
-	}
-
-	return &HTTP1SlowTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
new file mode 100644
index 0000000..740a24a
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -0,0 +1,364 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http1
+
+import (
+	"bufio"
+	"bytes"
+	"container/list"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"net/textproto"
+	"strconv"
+	"strings"
+	"sync"
+
+	"github.com/apache/skywalking-rover/pkg/logger"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
+	"github.com/sirupsen/logrus"
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1")
+
+var ProtocolName = "http1"
+
+var PackageSizeHistogramBuckets = []float64{
+	// 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
+	256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
+	// 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M
+	819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800,
+}
+
+var DurationHistogramBuckets = []float64{
+	// unit ms
+	1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290,
+	330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000,
+}
+
+var SlowTraceTopNSize = 10
+
+type Analyzer struct {
+	// cache connection metrics if the connect event not receive or process
+	cache map[string]*ConnectionMetrics
+}
+
+type ConnectionMetrics struct {
+	// halfData all data event(request/response) not finished
+	halfData *list.List
+
+	clientMetrics *URIMetrics
+	serverMetrics *URIMetrics
+	metricsLocker sync.RWMutex
+}
+
+func NewHTTP1Analyzer() protocol.Protocol {
+	return &Analyzer{
+		cache: make(map[string]*ConnectionMetrics),
+	}
+}
+
+func (h *Analyzer) Name() string {
+	return ProtocolName
+}
+
+func (h *Analyzer) GenerateMetrics() protocol.Metrics {
+	return &ConnectionMetrics{
+		halfData: list.New(),
+
+		clientMetrics: NewHTTP1URIMetrics(),
+		serverMetrics: NewHTTP1URIMetrics(),
+	}
+}
+
+func (h *Analyzer) ReceiveData(context protocol.Context, event *protocol.SocketDataUploadEvent) bool {
+	// only handle the HTTP1 protocol
+	if event.Protocol != base.ConnectionProtocolHTTP {
+		return false
+	}
+
+	connectionID := event.GenerateConnectionID()
+	fromAnalyzerCache := false
+	var connectionMetrics *ConnectionMetrics
+	connection := context.QueryConnection(event.ConnectionID, event.RandomID)
+	// if connection not exists, then cached it into the analyzer context
+	if connection == nil {
+		connectionMetrics = h.cache[connectionID]
+		fromAnalyzerCache = true
+		if connectionMetrics == nil {
+			connectionMetrics = h.GenerateMetrics().(*ConnectionMetrics)
+			h.cache[connectionID] = connectionMetrics
+		}
+	} else {
+		connectionMetrics = context.QueryProtocolMetrics(connection.Metrics, ProtocolName).(*ConnectionMetrics)
+	}
+
+	log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
+		connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
+	// if the cache is existing in the analyzer context, then delete it
+	if !fromAnalyzerCache {
+		if tmp := h.cache[connectionID]; tmp != nil {
+			connectionMetrics.MergeFrom(h, tmp)
+			delete(h.cache, connectionID)
+		}
+	}
+
+	req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
+	if req != nil && resp != nil {
+		if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
+			log.Errorf("HTTP1 analyze failure: %v", err)
+			return false
+		}
+	} else {
+		log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
+	}
+	return true
+}
+
+func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
+	lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
+	firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
+	if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
+		halfConnections.Remove(firstElement)
+		return h.combineEventIfNeed(firstEvent, lastAppender)
+	}
+	next := firstElement.Next()
+	halfConnections.Remove(firstElement)
+	var buffer protocol.SocketDataBuffer = firstEvent
+	// for-each the events until buffer finished
+	for next != nil {
+		event := next.Value.(*protocol.SocketDataUploadEvent)
+
+		buffer = buffer.Combine(event)
+
+		tmp := next.Next()
+		halfConnections.Remove(next)
+		next = tmp
+		// combine event
+		if event.Finished == 1 {
+			return h.combineEventIfNeed(buffer, lastAppender)
+		}
+	}
+	return h.combineEventIfNeed(buffer, lastAppender)
+}
+
+func (h *Analyzer) combineEventIfNeed(data, appender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
+	if appender != nil {
+		return data.Combine(appender)
+	}
+	return data
+}
+
+func (h *Analyzer) buildHTTP1(halfConnections *list.List, event *protocol.SocketDataUploadEvent) (request, response protocol.SocketDataBuffer) {
+	// no connections, then just add the response to the half connections to wait the request
+	if halfConnections.Len() == 0 {
+		halfConnections.PushBack(event)
+		return nil, nil
+	}
+
+	// quick handler(only one element, and is request)
+	if halfConnections.Len() == 1 {
+		firstElement := halfConnections.Front()
+		firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
+		if firstEvent.IsStart() && firstEvent.Finished == 1 && event.IsStart() && event.Finished == 1 &&
+			firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
+			event.MsgType == base.SocketMessageTypeResponse {
+			return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
+		}
+	}
+
+	// push to the queue
+	h.insertToList(halfConnections, event)
+
+	// trying to find completed request and response
+	return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
+}
+
+func (h *Analyzer) insertToList(halfConnections *list.List, event *protocol.SocketDataUploadEvent) {
+	if halfConnections.Len() == 0 {
+		halfConnections.PushFront(event)
+		return
+	}
+	if halfConnections.Back().Value.(*protocol.SocketDataUploadEvent).DataID < event.DataID {
+		halfConnections.PushBack(event)
+		return
+	}
+	beenAdded := false
+	for element := halfConnections.Front(); element != nil; element = element.Next() {
+		existEvent := element.Value.(*protocol.SocketDataUploadEvent)
+		if existEvent.DataID > event.DataID {
+			// data id needs order
+			beenAdded = true
+		} else if existEvent.DataID == event.DataID {
+			if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
+				// same message type and following the sequence order
+				beenAdded = true
+			} else if existEvent.MsgType > event.MsgType {
+				// request needs before response
+				beenAdded = true
+			}
+		}
+		if beenAdded {
+			halfConnections.InsertBefore(event, element)
+			break
+		}
+	}
+	if !beenAdded {
+		halfConnections.PushBack(event)
+	}
+}
+
+func (h *Analyzer) analyze(_ protocol.Context, connectionID string, connectionMetrics *ConnectionMetrics,
+	requestBuffer, responseBuffer protocol.SocketDataBuffer) error {
+	request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
+	if err != nil {
+		return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
+			len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
+	}
+
+	response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
+	if response != nil {
+		defer response.Body.Close()
+	}
+	if err != nil {
+		if err == io.ErrUnexpectedEOF || err == io.EOF {
+			response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
+			if err != nil {
+				return fmt.Errorf("parsing simple data error: %v", err)
+			}
+			if response != nil && response.Body != nil {
+				defer response.Body.Close()
+			}
+		}
+		if err != nil {
+			return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
+				len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
+		}
+	}
+
+	// lock append metrics with read locker
+	connectionMetrics.metricsLocker.RLock()
+	defer connectionMetrics.metricsLocker.RUnlock()
+
+	// append metrics
+	data := connectionMetrics.clientMetrics
+	side := base.ConnectionRoleClient
+	if requestBuffer.Direction() == base.SocketDataDirectionIngress {
+		// if receive the request, that's mean is server side
+		data = connectionMetrics.serverMetrics
+		side = base.ConnectionRoleServer
+	}
+	data.Append(request, requestBuffer, response, responseBuffer)
+
+	if log.Enable(logrus.DebugLevel) {
+		metricsJSON, _ := json.Marshal(data)
+		log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
+	}
+	return nil
+}
+
+func (h *Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
+	if reader.Size() < 16 {
+		return nil, fmt.Errorf("the header length not enough")
+	}
+	tp := textproto.NewReader(reader)
+	resp := &http.Response{
+		Request: request,
+	}
+
+	line, err := tp.ReadLine()
+	if err != nil {
+		return nil, fmt.Errorf("read response first line failure: %v", err)
+	}
+	indexByte := strings.IndexByte(line, ' ')
+	if indexByte == -1 {
+		return nil, fmt.Errorf("parsing response error: %s", line)
+	}
+	resp.Proto = line[:indexByte]
+	resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
+	statusCode := resp.Status
+	if i := strings.IndexByte(resp.Status, ' '); i != -1 {
+		statusCode = resp.Status[:i]
+	}
+	if len(statusCode) != 3 {
+		return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
+	}
+	resp.StatusCode, err = strconv.Atoi(statusCode)
+	if err != nil || resp.StatusCode < 0 {
+		return nil, fmt.Errorf("status code not correct: %s", statusCode)
+	}
+	var ok bool
+	if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
+		return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+	}
+
+	return resp, nil
+}
+
+func (h *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+	other := data.(*ConnectionMetrics)
+	other.metricsLocker.Lock()
+	defer other.metricsLocker.Unlock()
+
+	h.clientMetrics.MergeAndClean(other.clientMetrics)
+	h.serverMetrics.MergeAndClean(other.serverMetrics)
+	if log.Enable(logrus.DebugLevel) {
+		clientMetrics, _ := json.Marshal(h.clientMetrics)
+		serverMetrics, _ := json.Marshal(h.serverMetrics)
+		log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
+			connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
+	}
+}
+
+func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+	for _, p := range traffic.LocalProcesses {
+		// if the remote process is profiling, then used the client side
+		localMetrics := h.clientMetrics
+		remoteMetrics := h.serverMetrics
+		if traffic.Role == base.ConnectionRoleServer {
+			localMetrics = h.serverMetrics
+			remoteMetrics = h.clientMetrics
+		}
+
+		metricsCount := localMetrics.appendMetrics(traffic, p, "", metricsBuilder, false)
+		if traffic.RemoteProcessIsProfiling() {
+			metricsCount += remoteMetrics.appendMetrics(traffic, p, "", metricsBuilder, true)
+		}
+		if metricsCount <= 0 {
+			continue
+		}
+
+		if log.Enable(logrus.DebugLevel) {
+			// if remote process is profiling, then the metrics data need to be cut half
+			log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)",
+				traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
+				h.clientMetrics.String(), h.serverMetrics.String())
+		}
+	}
+}
+
+func (h *ConnectionMetrics) MergeFrom(analyzer *Analyzer, other *ConnectionMetrics) {
+	if other.halfData != nil {
+		for element := other.halfData.Front(); element != nil; element = element.Next() {
+			analyzer.insertToList(h.halfData, element.Value.(*protocol.SocketDataUploadEvent))
+		}
+	}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
similarity index 94%
rename from pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
index 32ad96a..a663a6e 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1_test.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package protocols
+package http1
 
 import (
 	"bufio"
@@ -25,6 +25,8 @@ import (
 	"strings"
 	"testing"
 
+	base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 )
 
@@ -243,13 +245,13 @@ func TestBuildHTTP1(t *testing.T) {
 
 	for _, testCase := range tests {
 		//t.Run(testCase.name, func(t *testing.T) {
-		analyzer := NewHTTP1Analyzer().(*HTTP1Analyzer)
+		analyzer := NewHTTP1Analyzer().(*Analyzer)
 		l := list.New()
 		var events = make([]struct {
 			start, end int
 		}, 0)
 		for _, event := range testCase.events {
-			req, resp := analyzer.buildHTTP1(l, &SocketDataUploadEvent{
+			req, resp := analyzer.buildHTTP1(l, &base2.SocketDataUploadEvent{
 				DataID:   uint64(event.dataID),
 				MsgType:  base.SocketMessageType(event.dataType),
 				Sequence: uint16(event.sequence),
@@ -272,7 +274,7 @@ func TestBuildHTTP1(t *testing.T) {
 		}
 		actualList := make([]int, 0)
 		for element := l.Front(); element != nil; element = element.Next() {
-			actualList = append(actualList, int(element.Value.(*SocketDataUploadEvent).DataID))
+			actualList = append(actualList, int(element.Value.(*base2.SocketDataUploadEvent).DataID))
 		}
 		if !reflect.DeepEqual(exceptedList, actualList) {
 			t.Fatalf("excepted residue data list: %v, actual: %v", exceptedList, actualList)
@@ -297,7 +299,7 @@ func bufferConvert(data string) [2048]byte {
 func TestParseSimpleHTTP1Response(t *testing.T) {
 	s := `HTTP/1.0 200 OK\r\n`
 	h := &http.Request{}
-	analyzer := NewHTTP1Analyzer().(*HTTP1Analyzer)
+	analyzer := NewHTTP1Analyzer().(*Analyzer)
 	resp, err := analyzer.tryingToReadResponseWithoutHeaders(bufio.NewReader(strings.NewReader(s)), h)
 	if err != nil {
 		t.Fatalf("reading simple response error: %v", err)
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
similarity index 76%
rename from pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go
rename to pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
index e8420ef..733919b 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1_builder.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package protocols
+package http1
 
 import (
 	"bufio"
@@ -23,18 +23,20 @@ import (
 	"container/list"
 	"net/http"
 
+	base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 )
 
-type HTTP1BufferAnalyzer struct {
-	http1Analyzer *HTTP1Analyzer
+type BufferAnalyzer struct {
+	http1Analyzer *Analyzer
 
-	unknownEventBuffer SocketDataBuffer
+	unknownEventBuffer base2.SocketDataBuffer
 	unknownElement     *list.Element
 	unknownSize        int
-	request            *SocketDataUploadEvent
+	request            *base2.SocketDataUploadEvent
 	requestElement     *list.Element
-	response           *SocketDataUploadEvent
+	response           *base2.SocketDataUploadEvent
 	responseElement    *list.Element
 
 	unknownDataID      uint64
@@ -47,13 +49,13 @@ type HTTP1BufferAnalyzer struct {
 	respFinished       bool
 }
 
-func NewHTTP1BufferAnalyzer(http1 *HTTP1Analyzer) *HTTP1BufferAnalyzer {
-	return &HTTP1BufferAnalyzer{http1Analyzer: http1}
+func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
+	return &BufferAnalyzer{http1Analyzer: http1}
 }
 
-func (h *HTTP1BufferAnalyzer) Analyze(events *list.List) (request, response SocketDataBuffer) {
+func (h *BufferAnalyzer) Analyze(events *list.List) (request, response base2.SocketDataBuffer) {
 	for element := events.Front(); element != nil; element = element.Next() {
-		curEvent := element.Value.(*SocketDataUploadEvent)
+		curEvent := element.Value.(*base2.SocketDataUploadEvent)
 		// transform the unknown to the request or response
 		if continueReading, req, resp := h.handleUnknown(events, element, curEvent); req != nil && resp != nil {
 			return req, resp
@@ -74,8 +76,8 @@ func (h *HTTP1BufferAnalyzer) Analyze(events *list.List) (request, response Sock
 	return nil, nil
 }
 
-func (h *HTTP1BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
-	curEvent *SocketDataUploadEvent) (continueReading bool, req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
+	curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
 	if curEvent.MsgType != base.SocketMessageTypeUnknown {
 		return false, nil, nil
 	}
@@ -111,8 +113,8 @@ func (h *HTTP1BufferAnalyzer) handleUnknown(event *list.List, element *list.Elem
 	return false, nil, nil
 }
 
-func (h *HTTP1BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
-	curEvent *SocketDataUploadEvent) (continueReading bool, req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
+	curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
 	if h.request == nil {
 		// find the first request package event
 		if curEvent.MsgType == base.SocketMessageTypeRequest && curEvent.IsStart() {
@@ -145,7 +147,8 @@ func (h *HTTP1BufferAnalyzer) handleRequest(events *list.List, element *list.Ele
 	return false, nil, nil
 }
 
-func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.Element, curEvent *SocketDataUploadEvent) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) handleResponse(events *list.List, element *list.Element,
+	curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
 	if h.response == nil {
 		// if current response is not start, then clean to re-find new one
 		if !curEvent.IsStart() {
@@ -166,10 +169,11 @@ func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.El
 	}
 
 	// if response sequence is broken, then clean the context
-	if !h.respFinished && (h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence) {
+	if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence {
 		h.cleanContext()
 		return nil, nil
 	}
+	h.respDataID = curEvent.DataID
 	h.respMaxSequence = curEvent.Sequence
 
 	if h.reqFinished && curEvent.IsFinished() {
@@ -178,14 +182,14 @@ func (h *HTTP1BufferAnalyzer) handleResponse(events *list.List, element *list.El
 	return nil, nil
 }
 
-func (h *HTTP1BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
 	h.unknownEventBuffer = curEvent
 	h.unknownElement = element
 	h.unknownDataID = curEvent.DataID
 	h.unknownMaxSequence = curEvent.Sequence
 }
 
-func (h *HTTP1BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
 	h.request = curEvent
 	h.reqDataID = curEvent.DataID
 	h.reqMaxSequence = curEvent.Sequence
@@ -193,7 +197,7 @@ func (h *HTTP1BufferAnalyzer) resetStartRequest(element *list.Element, curEvent
 	h.requestElement = element
 }
 
-func (h *HTTP1BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *SocketDataUploadEvent) {
+func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
 	h.response = curEvent
 	h.respDataID = curEvent.DataID
 	h.respMaxSequence = curEvent.Sequence
@@ -201,7 +205,7 @@ func (h *HTTP1BufferAnalyzer) resetStartResponse(element *list.Element, curEvent
 	h.respFinished = curEvent.IsFinished()
 }
 
-func (h *HTTP1BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *SocketDataUploadEvent) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
 	if h.unknownEventBuffer == nil {
 		return nil, nil
 	}
@@ -237,13 +241,13 @@ func (h *HTTP1BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEv
 	return nil, nil
 }
 
-func (h *HTTP1BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
+func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
 	// update message type and total size
-	firstEvent := element.Value.(*SocketDataUploadEvent)
+	firstEvent := element.Value.(*base2.SocketDataUploadEvent)
 	firstEvent.MsgType = msgType
 	dataLen := int(firstEvent.DataLen)
 	for e := element.Next(); e != nil; e = e.Next() {
-		curEvent := e.Value.(*SocketDataUploadEvent)
+		curEvent := e.Value.(*base2.SocketDataUploadEvent)
 		if curEvent.Finished == 1 {
 			curEvent.MsgType = msgType
 			dataLen += int(curEvent.DataLen)
@@ -255,32 +259,32 @@ func (h *HTTP1BufferAnalyzer) transformUnknown(element *list.Element, msgType ba
 	}
 }
 
-func (h *HTTP1BufferAnalyzer) cleanContext() {
+func (h *BufferAnalyzer) cleanContext() {
 	h.cleanUnknownContext()
 	h.cleanRequestContext()
 	h.cleanResponseContext()
 }
 
-func (h *HTTP1BufferAnalyzer) cleanResponseContext() {
+func (h *BufferAnalyzer) cleanResponseContext() {
 	h.response = nil
 	h.respDataID = 0
 	h.respMaxSequence = 0
 	h.respFinished = false
 }
 
-func (h *HTTP1BufferAnalyzer) cleanRequestContext() {
+func (h *BufferAnalyzer) cleanRequestContext() {
 	h.request = nil
 	h.reqDataID = 0
 	h.reqMaxSequence = 0
 	h.reqFinished = false
 }
 
-func (h *HTTP1BufferAnalyzer) cleanUnknownContext() {
+func (h *BufferAnalyzer) cleanUnknownContext() {
 	h.unknownEventBuffer, h.unknownElement = nil, nil
 	h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
 }
 
-func (h *HTTP1BufferAnalyzer) buildHTTP(events *list.List) (req, resp SocketDataBuffer) {
+func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp base2.SocketDataBuffer) {
 	return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, nil),
 		h.http1Analyzer.combineAndRemoveEvent(events, h.responseElement, nil)
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
new file mode 100644
index 0000000..cc3014f
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -0,0 +1,390 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package http1
+
+import (
+	"bufio"
+	"bytes"
+	"compress/gzip"
+	"encoding/json"
+	"fmt"
+	"io"
+	"mime"
+	"net/http"
+	"strings"
+	"time"
+
+	"golang.org/x/net/html/charset"
+
+	"github.com/apache/skywalking-rover/pkg/process/api"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
+	"github.com/apache/skywalking-rover/pkg/tools"
+	"github.com/apache/skywalking-rover/pkg/tools/host"
+
+	"github.com/docker/go-units"
+
+	commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+	logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
+)
+
+var (
+	transportRequest  = "Request"
+	transportResponse = "Response"
+)
+
+type URIMetrics struct {
+	RequestCounter *metrics.Counter
+	StatusCounter  map[int]*metrics.Counter
+
+	AvgRequestPackageSize    *metrics.AvgCounter
+	AvgResponsePackageSize   *metrics.AvgCounter
+	ReqPackageSizeHistogram  *metrics.Histogram
+	RespPackageSizeHistogram *metrics.Histogram
+
+	avgDuration       *metrics.AvgCounter
+	durationHistogram *metrics.Histogram
+
+	slowTraces *metrics.TopN
+}
+
+func NewHTTP1URIMetrics() *URIMetrics {
+	return &URIMetrics{
+		RequestCounter:           metrics.NewCounter(),
+		StatusCounter:            make(map[int]*metrics.Counter),
+		AvgRequestPackageSize:    metrics.NewAvgCounter(),
+		AvgResponsePackageSize:   metrics.NewAvgCounter(),
+		ReqPackageSizeHistogram:  metrics.NewHistogram(PackageSizeHistogramBuckets),
+		RespPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
+		avgDuration:              metrics.NewAvgCounter(),
+		durationHistogram:        metrics.NewHistogram(DurationHistogramBuckets),
+		slowTraces:               metrics.NewTopN(SlowTraceTopNSize),
+	}
+}
+
+func (u *URIMetrics) Append(req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
+	u.RequestCounter.Increase()
+	statusCounter := u.StatusCounter[resp.StatusCode]
+	if statusCounter == nil {
+		statusCounter = metrics.NewCounter()
+		u.StatusCounter[resp.StatusCode] = statusCounter
+	}
+	statusCounter.Increase()
+
+	u.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
+	u.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
+	u.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
+	u.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
+
+	duration := time.Duration(respBuffer.EndTime() - reqBuffer.StartTime())
+	durationInMS := float64(duration.Milliseconds())
+	u.avgDuration.Increase(durationInMS)
+	u.durationHistogram.Increase(durationInMS)
+
+	u.increaseSlowTraceTopN(u.slowTraces, duration, req, resp, reqBuffer, respBuffer)
+}
+
+func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
+	local api.ProcessInterface, url string, metricsBuilder *base.MetricsBuilder, durationOnly bool) int {
+	collections := make([]*v3.MeterData, 0)
+	role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
+	prefix := metricsBuilder.MetricPrefix()
+
+	collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
+		traffic, u.avgDuration)
+	collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
+		traffic, u.durationHistogram)
+	if durationOnly {
+		return len(collections)
+	}
+
+	collections = u.buildMetrics(collections, prefix, "request_counter", labels, url, traffic, u.RequestCounter)
+	for status, counter := range u.StatusCounter {
+		statusLabels := append(labels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
+		collections = u.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic, counter)
+	}
+
+	collections = u.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, u.AvgRequestPackageSize)
+	collections = u.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, u.AvgResponsePackageSize)
+	collections = u.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic, u.ReqPackageSizeHistogram)
+	collections = u.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, u.RespPackageSizeHistogram)
+
+	metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
+	logsCount := u.slowTraces.AppendData(local, traffic, metricsBuilder)
+	return len(collections) + logsCount
+}
+
+func (u *URIMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
+	url string, _ *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
+	// if remote process is also profiling, then needs to be calculated half of metrics
+	labels := basicLabels
+	var meterName string
+	if url != "" {
+		labels = append(labels, &v3.Label{Name: "url", Value: url})
+		meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
+	} else {
+		meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
+	}
+	return data.AppendMeter(collection, meterName, labels)
+}
+
+func (u *URIMetrics) MergeAndClean(other *URIMetrics) {
+	u.RequestCounter.MergeAndClean(other.RequestCounter)
+	for k, v := range other.StatusCounter {
+		cur := u.StatusCounter[k]
+		if cur == nil {
+			cur = metrics.NewCounter()
+			u.StatusCounter[k] = cur
+		}
+		cur.MergeAndClean(v)
+	}
+
+	u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
+	u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
+	u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
+	u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
+	u.avgDuration.MergeAndClean(other.avgDuration)
+	u.durationHistogram.MergeAndClean(other.durationHistogram)
+	u.slowTraces.MergeAndClean(other.slowTraces)
+}
+
+func (u *URIMetrics) String() string {
+	return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, slow trace count: %d, response counters: %v",
+		u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
+		u.avgDuration.Calculate(), u.slowTraces.List.Len(), u.StatusCounter)
+}
+
+func (u *URIMetrics) increaseSlowTraceTopN(slowTraceTopN *metrics.TopN, duration time.Duration,
+	request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
+	tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
+		return request.Header.Get(key)
+	})
+	if err != nil {
+		log.Warnf("analyze tracing context error: %v", err)
+		return
+	}
+	if tracingContext == nil {
+		return
+	}
+
+	// remove the query parameters
+	uri := request.RequestURI
+	if i := strings.Index(uri, "?"); i > 0 {
+		uri = uri[0:i]
+	}
+	trace := &Trace{Trace: tracingContext, RequestURI: uri, RequestBuffer: reqBuffer, ResponseBuffer: respBuffer, Request: request, Response: response}
+	slowTraceTopN.AddRecord(trace, duration.Milliseconds())
+}
+
+type Trace struct {
+	Trace          protocol.TracingContext
+	RequestURI     string
+	RequestBuffer  protocol.SocketDataBuffer
+	Request        *http.Request
+	ResponseBuffer protocol.SocketDataBuffer
+	Response       *http.Response
+}
+
+func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+	logData := &logv3.LogData{}
+	logData.Service = process.Entity().ServiceName
+	logData.ServiceInstance = process.Entity().InstanceName
+	logData.Layer = process.Entity().Layer
+
+	logData.Tags = &logv3.LogTags{Data: make([]*commonv3.KeyStringValuePair, 0)}
+	logData.Tags.Data = append(logData.Tags.Data, &commonv3.KeyStringValuePair{Key: "LOG_KIND", Value: "NET_PROFILING_SAMPLED_TRACE"})
+
+	// trace context
+	traceContext := &logv3.TraceContext{}
+	traceContext.TraceId = h.Trace.TraceID()
+	logData.TraceContext = traceContext
+
+	// body
+	logBody := &logv3.LogDataBody{Type: "json"}
+	body := &SlowTraceLogBody{
+		Latency:       duration,
+		TraceProvider: h.Trace.Provider().Name,
+		DetectPoint:   traffic.Role.String(),
+		Component:     traffic.Protocol.String(),
+		SSL:           traffic.IsSSL,
+		URI:           h.RequestURI,
+		Reason:        "slow",
+	}
+	if traffic.Role == base.ConnectionRoleClient {
+		body.ClientProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
+		body.ServerProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+	} else {
+		body.ServerProcess = &SlowTraceLogProcess{ProcessID: process.ID()}
+		body.ClientProcess = NewHTTP1SlowTRaceLogRemoteProcess(traffic, process)
+	}
+	bodyJSON, err := json.Marshal(body)
+	if err != nil {
+		log.Warnf("format the slow trace log body failure: %v", err)
+		return
+	}
+	logBody.Content = &logv3.LogDataBody_Json{Json: &logv3.JSONLog{Json: string(bodyJSON)}}
+	logData.Body = logBody
+
+	metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
+
+	// append full http content
+	h.AppendHTTPEvents(process, traffic, metricsBuilder)
+}
+
+func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+	events := make([]*v3.SpanAttachedEvent, 0)
+	events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header, h.Request.Body, h.RequestBuffer)
+	events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header, h.Response.Body, h.ResponseBuffer)
+
+	metricsBuilder.AppendSpanAttachedEvents(events)
+}
+
+func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
+	tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) []*v3.SpanAttachedEvent {
+	content, err := h.transformHTTPRequest(header, body, buffer)
+	if err != nil {
+		log.Warnf("transform http %s erorr: %v", tp, err)
+		return events
+	}
+
+	event := &v3.SpanAttachedEvent{}
+	event.StartTime = host.TimeToInstant(buffer.StartTime())
+	event.EndTime = host.TimeToInstant(buffer.EndTime())
+	event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
+	event.Tags = make([]*commonv3.KeyStringValuePair, 0)
+	event.Tags = append(event.Tags,
+		// content data
+		&commonv3.KeyStringValuePair{Key: "data size", Value: units.BytesSize(float64(buffer.TotalSize()))},
+		&commonv3.KeyStringValuePair{Key: "data content", Value: content},
+		&commonv3.KeyStringValuePair{Key: "data direction", Value: buffer.Direction().String()},
+		&commonv3.KeyStringValuePair{Key: "data type", Value: tp},
+		// connection
+		&commonv3.KeyStringValuePair{Key: "connection role", Value: traffic.Role.String()},
+		// entity
+		&commonv3.KeyStringValuePair{Key: "service name", Value: process.Entity().ServiceName},
+		&commonv3.KeyStringValuePair{Key: "service instance name", Value: process.Entity().InstanceName},
+		&commonv3.KeyStringValuePair{Key: "process name", Value: process.Entity().ProcessName},
+	)
+
+	// current event needs bind to the upstream
+	if buffer.Direction() == base.SocketDataDirectionIngress && tp == transportRequest ||
+		buffer.Direction() == base.SocketDataDirectionEgress && tp == transportResponse {
+		event.Tags = append(event.Tags, &commonv3.KeyStringValuePair{Key: "bind to upstream span", Value: "true"})
+	}
+	event.Summary = make([]*commonv3.KeyIntValuePair, 0)
+	event.TraceContext = &v3.SpanAttachedEvent_SpanReference{
+		TraceId:        h.Trace.TraceID(),
+		TraceSegmentId: h.Trace.TraceSegmentID(),
+		SpanId:         h.Trace.SpanID(),
+		Type:           h.Trace.Provider().Type,
+	}
+	return append(events, event)
+}
+
+// nolint
+func (h *Trace) transformHTTPRequest(header http.Header, body io.Reader, buffer protocol.SocketDataBuffer) (string, error) {
+	var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == "gzip", true, true
+	contentType := header.Get("Content-Type")
+	if contentType != "" {
+		isPlain = strings.HasPrefix(contentType, "text/") || contentType == "application/json"
+		if _, params, err := mime.ParseMediaType(contentType); err == nil {
+			if cs, ok := params["charset"]; ok {
+				isUtf8 = cs == "utf-8"
+			}
+		}
+	}
+
+	if !needGzip && isPlain && isUtf8 {
+		return string(buffer.BufferData()), nil
+	}
+
+	// re-read the buffer and skip to the body position
+	buf := bufio.NewReaderSize(bytes.NewBuffer(buffer.BufferData()), len(buffer.BufferData()))
+	response, err := http.ReadResponse(buf, nil)
+	if err != nil {
+		return "", err
+	}
+	defer response.Body.Close()
+
+	// no text plain, no need to print the data
+	headerString := string(buffer.BufferData()[:len(buffer.BufferData())-buf.Buffered()])
+	if !isPlain {
+		return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
+	}
+
+	data := body
+	if needGzip {
+		data, err = gzip.NewReader(response.Body)
+		if err != nil {
+			return "", err
+		}
+	}
+	if !isUtf8 {
+		data, err = charset.NewReader(data, contentType)
+		if err != nil {
+			return "", err
+		}
+	}
+
+	realData, err := io.ReadAll(data)
+	if err != nil {
+		if err != io.ErrUnexpectedEOF {
+			return "", err
+		}
+		realData = append(realData, []byte("[chunked]")...)
+	}
+	return fmt.Sprintf("%s%s", headerString, string(realData)), nil
+}
+
+type SlowTraceLogBody struct {
+	URI           string               `json:"uri"`
+	Reason        string               `json:"reason"`
+	Latency       int64                `json:"latency"`
+	TraceProvider string               `json:"trace_provider"`
+	ClientProcess *SlowTraceLogProcess `json:"client_process"`
+	ServerProcess *SlowTraceLogProcess `json:"server_process"`
+	DetectPoint   string               `json:"detect_point"`
+	Component     string               `json:"component"`
+	SSL           bool                 `json:"ssl"`
+}
+
+type SlowTraceLogProcess struct {
+	ProcessID string `json:"process_id"`
+	Local     bool   `json:"local"`
+	Address   string `json:"address"`
+}
+
+func NewHTTP1SlowTRaceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *SlowTraceLogProcess {
+	if len(traffic.RemoteProcesses) != 0 {
+		for _, p := range traffic.RemoteProcesses {
+			// only match with same service instance
+			if local.Entity().ServiceName == p.Entity().ServiceName &&
+				local.Entity().InstanceName == p.Entity().InstanceName {
+				return &SlowTraceLogProcess{ProcessID: p.ID()}
+			}
+		}
+	}
+
+	if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
+		return &SlowTraceLogProcess{Local: true}
+	}
+
+	return &SlowTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go b/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
index 23d193e..2a01e66 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/metrics/counter.go
@@ -17,7 +17,11 @@
 
 package metrics
 
-import v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+import (
+	"fmt"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
 
 type Counter struct {
 	Value int
@@ -29,6 +33,10 @@ func NewCounter() *Counter {
 	}
 }
 
+func (c *Counter) String() string {
+	return fmt.Sprintf("%d", c.Value)
+}
+
 func (c *Counter) Increase() {
 	c.Value++
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 0fb1487..22735fc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -20,51 +20,33 @@ package protocols
 import (
 	"github.com/apache/skywalking-rover/pkg/logger"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols")
 
-var ListenerName = "layer7"
-
-var registerProtocols []func() Protocol
-var defaultInstances []Protocol
+var registerProtocols []func() protocol.Protocol
+var defaultInstances []protocol.Protocol
 
 func init() {
 	// register all protocol analyzers
-	registerProtocols = make([]func() Protocol, 0)
-	registerProtocols = append(registerProtocols, NewHTTP1Analyzer)
+	registerProtocols = make([]func() protocol.Protocol, 0)
+	registerProtocols = append(registerProtocols, http1.NewHTTP1Analyzer)
 
-	defaultInstances = make([]Protocol, 0)
+	defaultInstances = make([]protocol.Protocol, 0)
 	for _, p := range registerProtocols {
 		defaultInstances = append(defaultInstances, p())
 	}
 }
 
-type Protocol interface {
-	Name() string
-	GenerateMetrics() Metrics
-
-	ReceiveData(context Context, event *SocketDataUploadEvent) bool
-}
-
-type Context interface {
-	QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
-}
-
-type Metrics interface {
-	base.ConnectionMetrics
-
-	// FlushMetrics flush all metrics from traffic to the metricsBuilder
-	FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder)
-}
-
 type Analyzer struct {
-	ctx       Context
-	protocols []Protocol
+	ctx       protocol.Context
+	protocols []protocol.Protocol
 }
 
-func NewAnalyzer(ctx Context) *Analyzer {
-	protocols := make([]Protocol, 0)
+func NewAnalyzer(ctx protocol.Context) *Analyzer {
+	protocols := make([]protocol.Protocol, 0)
 	for _, r := range registerProtocols {
 		protocols = append(protocols, r())
 	}
@@ -74,7 +56,7 @@ func NewAnalyzer(ctx Context) *Analyzer {
 	}
 }
 
-func (a *Analyzer) ReceiveSocketDataEvent(event *SocketDataUploadEvent) {
+func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
 	for _, p := range a.protocols {
 		if p.ReceiveData(a.ctx, event) {
 			return
@@ -85,24 +67,25 @@ func (a *Analyzer) ReceiveSocketDataEvent(event *SocketDataUploadEvent) {
 }
 
 type ProtocolMetrics struct {
-	data map[string]Metrics
+	data map[string]protocol.Metrics
 }
 
 func NewProtocolMetrics() *ProtocolMetrics {
-	metrics := make(map[string]Metrics)
+	metrics := make(map[string]protocol.Metrics)
 	for _, p := range defaultInstances {
 		metrics[p.Name()] = p.GenerateMetrics()
 	}
 	return &ProtocolMetrics{data: metrics}
 }
 
-func (m *ProtocolMetrics) GetProtocolMetrics(name string) Metrics {
+func (m *ProtocolMetrics) GetProtocolMetrics(name string) protocol.Metrics {
 	return m.data[name]
 }
 
-func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext) {
-	for _, d := range m.data {
-		d.MergeMetricsFromConnection(connection)
+func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+	otherMetrics := data.(*ProtocolMetrics)
+	for p, d := range m.data {
+		d.MergeMetricsFromConnection(connection, otherMetrics.GetProtocolMetrics(p))
 	}
 }
 
@@ -111,8 +94,3 @@ func (m *ProtocolMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuil
 		d.FlushMetrics(traffic, metricsBuilder)
 	}
 }
-
-func QueryProtocolMetrics(metricsContext *base.ConnectionMetricsContext, protocolName string) Metrics {
-	metrics := metricsContext.GetMetrics(ListenerName).(*ProtocolMetrics)
-	return metrics.GetProtocolMetrics(protocolName)
-}
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index ac4dbe4..6b8063d 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -51,6 +51,7 @@ type Runner struct {
 	stopOnce       sync.Once
 	meterClient    v3.MeterReportServiceClient
 	logClient      logv3.LogReportServiceClient
+	eventClient    v3.SpanAttachedEventReportServiceClient
 	reportInterval time.Duration
 	meterPrefix    string
 
@@ -211,7 +212,15 @@ func (r *Runner) flushData() error {
 	if count, err1 := r.flushLogs(metricsBuilder); err1 != nil {
 		err = multierror.Append(err, err1)
 	} else if count > 0 {
-		log.Infof("total send network topology logs data: %d", count)
+		log.Infof("total send network logs data: %d", count)
+	}
+
+	eventCount, eventError := r.flushEvents(metricsBuilder)
+	if eventError != nil {
+		err = multierror.Append(err, eventError)
+	}
+	if eventCount > 0 {
+		log.Infof("total send network events data: %d", eventCount)
 	}
 	return err
 }
@@ -272,6 +281,33 @@ func (r *Runner) flushLogs(builder *analyzeBase.MetricsBuilder) (int, error) {
 	return count, nil
 }
 
+func (r *Runner) flushEvents(builder *analyzeBase.MetricsBuilder) (int, error) {
+	events := builder.BuildEvents()
+	if len(events) == 0 {
+		return 0, nil
+	}
+
+	collector, err := r.eventClient.Collect(r.ctx)
+	if err != nil {
+		return 0, err
+	}
+	defer func() {
+		if _, e := collector.CloseAndRecv(); e != nil {
+			log.Warnf("close the event stream error: %v", e)
+		}
+	}()
+	count := 0
+	var sendErrors error
+	for _, m := range events {
+		if err := collector.Send(m); err != nil {
+			sendErrors = multierror.Append(fmt.Errorf("send error, traceid: %s, event: %s, reason: %v", m.TraceContext.TraceId, m.Event, err))
+		} else {
+			count++
+		}
+	}
+	return count, sendErrors
+}
+
 func (r *Runner) Stop() error {
 	// if starting, then need to wait start finished
 	r.startLock.Lock()
@@ -301,6 +337,7 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error
 	connection := coreOperator.BackendOperator().GetConnection()
 	r.meterClient = v3.NewMeterReportServiceClient(connection)
 	r.logClient = logv3.NewLogReportServiceClient(connection)
+	r.eventClient = v3.NewSpanAttachedEventReportServiceClient(connection)
 
 	reportInterval, err := time.ParseDuration(config.Network.ReportInterval)
 	if err != nil {
diff --git a/pkg/tools/host/time.go b/pkg/tools/host/time.go
index 1d03bcf..b1ef253 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/tools/host/time.go
@@ -21,6 +21,8 @@ import (
 	"fmt"
 	"time"
 
+	v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
+
 	"github.com/shirou/gopsutil/host"
 )
 
@@ -34,3 +36,12 @@ func init() {
 	}
 	BootTime = time.Unix(int64(boot), 0)
 }
+
+func TimeToInstant(bpfTime uint64) *v3.Instant {
+	timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
+	result := timeCopy.Add(time.Duration(bpfTime))
+	return &v3.Instant{
+		Seconds: result.Unix(),
+		Nanos:   int32(result.Nanosecond()),
+	}
+}
diff --git a/test/e2e/base/env b/test/e2e/base/env
index e414f00..dab2c61 100644
--- a/test/e2e/base/env
+++ b/test/e2e/base/env
@@ -13,8 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SW_CTL_COMMIT=521843f963917aa806740a9ad09c65aa59aca179
-SW_OAP_COMMIT=4adc05f89c5506541f6c0817db90238c96e1b5d3
+SW_CTL_COMMIT=651d196ee9345b164cc35fc1dbdcaf058920226a
+SW_OAP_COMMIT=a386853bc9ef6221c8d6d1688b607e1d230f5ec4
 SW_KUBERNETES_COMMIT_SHA=0f3ec68e5a7e1608cec8688716b848ed15e971e5
 
 SW_AGENT_GO_COMMIT=216f122d942cb683f48578d3014cc5ea83637582
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/expected/skywalking-trace.yml b/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
index ce30c3a..0ee677b 100644
--- a/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
+++ b/test/e2e/cases/profiling/network/expected/skywalking-trace.yml
@@ -14,23 +14,38 @@
 # limitations under the License.
 
 spans:
-  {{- contains .spans}}
-  - traceid: {{ notEmpty .traceid }}
-    segmentid: {{ notEmpty .segmentid }}
-    spanid: 0
-    parentspanid: -1
-    refs: []
-    servicecode: example
-    serviceinstancename: {{ notEmpty .serviceinstancename }}
-    starttime: {{ gt .starttime 0 }}
-    endtime: {{ gt .endtime 0 }}
-    endpointname: /provider
-    type: Exit
-    peer: https://proxy/provider
-    component: Unknown
-    iserror: false
-    layer: Unknown
-    tags: []
-    logs: []
-    attachedevents: []
-  {{- end }}
\ No newline at end of file
+{{- contains .spans}}
+- traceid: {{ notEmpty .traceid }}
+  segmentid: {{ notEmpty .segmentid }}
+  spanid: 0
+  parentspanid: -1
+  refs: []
+  servicecode: example
+  serviceinstancename: {{ notEmpty .serviceinstancename }}
+  starttime: {{ gt .starttime 0 }}
+  endtime: {{ gt .endtime 0 }}
+  endpointname: /provider
+  type: Exit
+  peer: https://proxy/provider
+  component: Unknown
+  iserror: false
+  layer: Unknown
+  tags: []
+  logs: []
+  attachedevents:
+  {{- contains .attachedevents}}
+    - starttime:
+        seconds: {{ gt .starttime.seconds 0 }}
+        nanos: {{ ge .starttime.nanos 0 }}
+      event: HTTP Request Sampling
+      endtime:
+        seconds: {{ gt .endtime.seconds 0 }}
+        nanos: {{ ge .endtime.nanos 0 }}
+      summary: []
+      tags:
+      {{- contains .tags }}
+      - key: data size
+        value: {{ notEmpty .value }}
+      {{- end }}
+  {{- end }}
+{{- end }}
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml b/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
index 06ba743..6f0e7ca 100644
--- a/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
+++ b/test/e2e/cases/profiling/network/http1-slow-traces-cases.yaml
@@ -16,7 +16,7 @@
 # HTTP1 verify
 cases:
   - query: |
-      curl https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
+      curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
       sleep 5;
       swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
         --name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
@@ -24,15 +24,17 @@ cases:
     expected: expected/slow-traces.yml
   # zipkin trace
   - query: |
+      curl -s https://${service_host}:${service_10443}/consumer-zipkin > /dev/null;
+      sleep 3;
       traceid=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
       --name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
-      --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
+      --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "zipkin-/provider-zipkin")).[0].id' -);
       curl http://${oap_host}:${oap_9412}/zipkin/api/v2/trace/${traceid} | yq e -| yq e 'del(.[].tags)' -
     expected: expected/zipkin-trace.yml
   # skywalking trace
   - query: |
       traceid=$(swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql records list \
       --name=sampled_slow_trace_record --service-name service --instance-name test --process-name service \
-      --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE | yq e '. | map(select(.name == "skywalking-/provider")).[0].id' -);
+      --dest-service-name service --dest-instance-name test --dest-process-name UNKNOWN_REMOTE 100 | yq e '. | map(select(.name == "skywalking-/provider")).[0].id' -);
        swctl --display yaml --base-url=http://${oap_host}:${oap_12800}/graphql trace $traceid
     expected: expected/skywalking-trace.yml
\ No newline at end of file