You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/01/05 08:50:52 UTC

[skywalking-rover] branch main updated: Enhance the protocol reader for support long socket data (#69)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new 8550199  Enhance the protocol reader for support long socket data (#69)
8550199 is described below

commit 8550199e98c9f5a4b2058878a0a899ffb73fe461
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Jan 5 16:50:47 2023 +0800

    Enhance the protocol reader for support long socket data (#69)
---
 CHANGES.md                                         |  13 +
 bpf/profiling/network/netmonitor.c                 |  28 +-
 bpf/profiling/network/sock_stats.h                 |   2 +-
 .../task/network/analyze/layer7/events.go          |   4 +
 .../task/network/analyze/layer7/listener.go        |   4 +-
 .../analyze/layer7/protocols/base/analyzer.go      | 229 ++++++++++++
 .../analyze/layer7/protocols/base/buffer.go        | 414 +++++++++++++++++++++
 .../analyze/layer7/protocols/base/buffer_test.go   |  94 +++++
 .../analyze/layer7/protocols/base/events.go        | 118 +++---
 .../analyze/layer7/protocols/base/protocol.go      |  16 +-
 .../analyze/layer7/protocols/http1/analyzer.go     | 318 +++++-----------
 .../layer7/protocols/http1/analyzer_test.go        | 310 ---------------
 .../analyze/layer7/protocols/http1/builder.go      | 290 ---------------
 .../analyze/layer7/protocols/http1/metrics.go      | 174 ++-------
 .../layer7/protocols/http1/reader/reader.go        | 298 +++++++++++++++
 .../layer7/protocols/http1/reader/request.go       | 143 +++++++
 .../layer7/protocols/http1/reader/response.go      | 125 +++++++
 .../analyze/layer7/protocols/http1/sampling.go     |  29 +-
 .../network/analyze/layer7/protocols/protocols.go  |  37 +-
 pkg/profiling/task/network/analyze/layer7/queue.go |   6 +-
 pkg/tools/host/time.go                             |   8 +-
 21 files changed, 1562 insertions(+), 1098 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b80c9f5..b6155ec 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,19 @@ Changes by Version
 ==================
 Release Notes.
 
+0.5.0
+------------------
+#### Features
+* Enhance the protocol reader for support long socket data.
+
+#### Bug Fixes
+
+#### Documentation
+
+#### Issues and PR
+- All issues are [here](https://github.com/apache/skywalking/milestone/167?closed=1)
+- All and pull requests are [here](https://github.com/apache/skywalking-rover/milestone/5?closed=1)
+
 0.4.0
 ------------------
 #### Features
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 05e8803..43577ba 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -46,7 +46,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
 		val;                                                           \
 	})
 
-#define SOCKET_UPLOAD_CHUNK_LIMIT 8
+#define SOCKET_UPLOAD_CHUNK_LIMIT 12
 
 static __inline bool family_should_trace(const __u32 family) {
     return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
@@ -272,10 +272,11 @@ static __always_inline void resent_connect_event(struct pt_regs *ctx, __u32 tgid
     }
 }
 
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, struct socket_data_upload_event *event) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct socket_data_upload_event *event) {
     event->sequence = index;
     event->data_len = size;
     event->finished = is_finished;
+    event->have_reduce_after_chunk = have_reduce_after_chunk;
     if (size <= 0) {
         return;
     }
@@ -292,8 +293,10 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t
         // calculate bytes need to send
         ssize_t remaining = size - already_send;
         size_t need_send_in_chunk = 0;
+        __u8 have_reduce_after_chunk = 0;
         if (remaining > MAX_TRANSMIT_SOCKET_READ_LENGTH) {
             need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;
+            have_reduce_after_chunk = 1;
         } else {
             need_send_in_chunk = remaining;
         }
@@ -304,7 +307,7 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t
             is_finished = 0;
             sequence = generate_socket_sequence(event->conid, event->data_id);
         }
-        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, event);
+        __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
         already_send += need_send_in_chunk;
 
     }
@@ -316,19 +319,22 @@ if (iov_index < iovlen) {                                   \
     BPF_PROBE_READ_VAR(cur_iov, &iov[iov_index]);           \
     ssize_t remaining = size - already_send;                \
     size_t need_send_in_chunk = remaining - cur_iov_sended; \
+    __u8 have_reduce_after_chunk = 0;                       \
     if (cur_iov_sended + need_send_in_chunk > cur_iov.iov_len) {            \
         need_send_in_chunk = cur_iov.iov_len - cur_iov_sended;              \
         if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) {         \
             need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;           \
+            have_reduce_after_chunk = 1;                                    \
         } else {                                                            \
             iov_index++;                                                    \
             cur_iov_sended = 0;                                             \
         }                                                                   \
     } else if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) {      \
         need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;               \
+        have_reduce_after_chunk = 1;                                        \
     }                                                                       \
-    __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
-    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, event);    \
+    __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false;                            \
+    __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, event);      \
     already_send += need_send_in_chunk;                                                                                              \
     loop_count++;                                                                                                                    \
 }
@@ -365,23 +371,11 @@ static __inline void upload_socket_data(void *ctx, __u64 start_time, __u64 end_t
     if (connection->ssl != ssl) {
         return;
     }
-    // if the msg type is unknown, then try to re-analysis
-    if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN) {
-        struct socket_buffer_reader_t *buf_reader = read_socket_data(args, bytes_count);
-        if (buf_reader == NULL) {
-            return;
-        }
-        existing_msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, connection);
-        if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN && args->ssl_buffer_force_unfinished == 0) {
-            return;
-        }
-    }
 
     // basic data
     event->start_time = start_time;
     event->end_time = end_time;
     event->protocol = connection->protocol;
-    event->msg_type = existing_msg_type;
     event->direction = data_direction;
     event->conid = conid;
     event->randomid = connection->random_id;
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index 84a46eb..8312af3 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -258,7 +258,7 @@ struct {
 
 struct socket_data_upload_event {
     __u8 protocol;
-    __u8 msg_type;
+    __u8 have_reduce_after_chunk;
     __u8 direction;
     __u8 finished;
     __u16 sequence;
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index 9096913..5590dce 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -61,6 +61,10 @@ func NewSocketDataPartitionContext(l base.Context, config *profiling.TaskConfig)
 	}
 }
 
+func (p *SocketDataPartitionContext) Start(ctx context.Context) {
+	p.analyzer.Start(ctx)
+}
+
 func (p *SocketDataPartitionContext) Consume(data interface{}) {
 	event := data.(*base.SocketDataUploadEvent)
 	p.analyzer.ReceiveSocketDataEvent(event)
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index 8edb532..61f32bd 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -132,9 +132,9 @@ func (l *Listener) QueryConnection(conID, randomID uint64) *base.ConnectionConte
 	return nil
 }
 
-func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) protocol.Metrics {
+func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, p base.ConnectionProtocol) protocol.Metrics {
 	metrics := conMetrics.GetMetrics(ListenerName).(*protocols.ProtocolMetrics)
-	return metrics.GetProtocolMetrics(protocolName)
+	return metrics.GetProtocolMetrics(p)
 }
 
 func (l *Listener) generateCachedConnectionKey(conID, randomID uint64) string {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
new file mode 100644
index 0000000..0abc173
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -0,0 +1,229 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-rover/pkg/logger"
+	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+)
+
+const (
+	batchReadMinCount = 1000
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "base")
+
+// ProtocolAnalyzer handler all socket data for each protocol
+type ProtocolAnalyzer struct {
+	protocolContext Context
+	protocol        Protocol
+	config          *profiling.TaskConfig
+
+	connections       map[connectionKey]*connectionInfo
+	analyzeLocker     sync.Mutex
+	receiveEventCount int
+}
+
+func NewProtocolAnalyzer(protocolContext Context, p Protocol, config *profiling.TaskConfig) *ProtocolAnalyzer {
+	return &ProtocolAnalyzer{
+		protocolContext: protocolContext,
+		protocol:        p,
+		config:          config,
+		connections:     make(map[connectionKey]*connectionInfo),
+	}
+}
+
+func (a *ProtocolAnalyzer) Start(ctx context.Context) {
+	duration, _ := time.ParseDuration(a.config.Network.ReportInterval)
+	timeTicker := time.NewTicker(duration)
+	go func() {
+		for {
+			select {
+			case <-timeTicker.C:
+				// process event with interval
+				a.processEvents()
+			case <-ctx.Done():
+				timeTicker.Stop()
+				return
+			}
+		}
+	}()
+
+	// if the protocol defined the events expire time, then check events interval
+	expireDuration := a.protocol.PackageMaxExpireDuration()
+	if expireDuration.Milliseconds() > 0 {
+		expireTicker := time.NewTicker(expireDuration)
+		go func() {
+			for {
+				select {
+				case <-expireTicker.C:
+					a.processExpireEvents(expireDuration)
+				case <-ctx.Done():
+					expireTicker.Stop()
+					return
+				}
+			}
+		}()
+	}
+}
+
+func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event *SocketDataUploadEvent) {
+	connectionID := event.GenerateConnectionID()
+	key := connectionKey{connectionID: event.ConnectionID, randomID: event.RandomID}
+	connection := a.connections[key]
+	if connection == nil {
+		connection = newConnectionInfo(a.protocol, ctx, key.connectionID, key.randomID)
+		a.connections[key] = connection
+	}
+	connection.checkConnectionMetrics(ctx)
+
+	log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, have reduce after chunk: %t, direction: %s, size: %d, total size: %d",
+		connectionID, event.DataID(), event.DataSequence(), event.Finished, event.HaveReduceDataAfterChunk(),
+		event.Direction().String(), event.DataLen, event.TotalSize0)
+
+	// insert to the event list
+	connection.buffer.appendEvent(event)
+
+	// process the events if reach the receiver counter
+	a.receiveEventCount++
+	if a.receiveEventCount >= batchReadMinCount {
+		a.processEvents()
+	}
+	a.receiveEventCount = 0
+}
+
+// processEvents means analyze the protocol in each connection
+func (a *ProtocolAnalyzer) processEvents() {
+	// it could be triggered by interval or reach counter
+	// if any trigger bean locked, the other one just ignore process
+	if !a.analyzeLocker.TryLock() {
+		return
+	}
+	defer a.analyzeLocker.Unlock()
+
+	for _, connection := range a.connections {
+		a.processConnectionEvents(connection)
+	}
+}
+
+// processExpireEvents delete the expired events
+func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
+	// the expiry must be mutual exclusion with events processor
+	a.analyzeLocker.Lock()
+	defer a.analyzeLocker.Unlock()
+
+	for _, connection := range a.connections {
+		a.processConnectionExpireEvents(connection, expireDuration)
+	}
+}
+
+func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
+	// reset the status for prepare reading
+	buffer := connection.buffer
+	metrics := connection.metrics
+	connectionID := connection.connectionID
+	buffer.resetForLoopReading()
+	// loop to read the protocol data
+	for {
+		// reset the status of reading
+		if !buffer.prepareForReading() {
+			log.Debugf("prepare finsihed: event size: %d", buffer.events.Len())
+			return
+		}
+
+		result := a.protocol.ParseProtocol(connectionID, metrics, buffer)
+		finishReading := false
+		switch result {
+		case ParseResultSuccess:
+			finishReading = buffer.removeReadElements()
+		case ParseResultSkipPackage:
+			finishReading = buffer.skipCurrentElement()
+		}
+
+		if finishReading {
+			log.Debugf("reading finsihed: event size: %d", buffer.events.Len())
+			break
+		}
+	}
+}
+
+func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection *connectionInfo, expireDuration time.Duration) {
+	if c := connection.buffer.deleteExpireEvents(expireDuration); c > 0 {
+		log.Debugf("total removed %d expired events for %s protocol", c, a.protocol.Protocol().String())
+	}
+}
+
+func (a *ProtocolAnalyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+	a.protocol.UpdateExtensionConfig(config)
+}
+
+type connectionKey struct {
+	connectionID uint64
+	randomID     uint64
+}
+
+type connectionInfo struct {
+	connectionID, randomID uint64
+	connectionProtocol     base.ConnectionProtocol
+	buffer                 *Buffer
+	metrics                Metrics
+	metricsFromConnection  bool
+}
+
+func newConnectionInfo(p Protocol, connectionContext Context, connectionID, randomID uint64) *connectionInfo {
+	fromConnection := false
+	var connectionMetrics Metrics
+	con := connectionContext.QueryConnection(connectionID, randomID)
+	// if connection not exists, then cached it into the analyzer context
+	if con == nil {
+		connectionMetrics = p.GenerateMetrics()
+	} else {
+		connectionMetrics = connectionContext.QueryProtocolMetrics(con.Metrics, p.Protocol())
+		fromConnection = true
+	}
+
+	return &connectionInfo{
+		connectionID:          connectionID,
+		randomID:              randomID,
+		connectionProtocol:    p.Protocol(),
+		buffer:                newBuffer(),
+		metrics:               connectionMetrics,
+		metricsFromConnection: fromConnection,
+	}
+}
+
+func (c *connectionInfo) checkConnectionMetrics(protocolContext Context) {
+	if c.metricsFromConnection {
+		return
+	}
+	connection := protocolContext.QueryConnection(c.connectionID, c.randomID)
+	if connection == nil {
+		return
+	}
+
+	// merge the temporary metrics into the connection metrics
+	connectionMetrics := protocolContext.QueryProtocolMetrics(connection.Metrics, c.connectionProtocol)
+	connectionMetrics.MergeMetricsFromConnection(connection, c.metrics)
+	c.metrics = connectionMetrics
+	c.metricsFromConnection = true
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
new file mode 100644
index 0000000..c7b2647
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -0,0 +1,414 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"container/list"
+	"errors"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"github.com/apache/skywalking-rover/pkg/tools/host"
+)
+
+var (
+	ErrNotComplete = errors.New("socket: not complete event")
+)
+
+type Buffer struct {
+	events    *list.List
+	validated bool // the events list is validated or not
+
+	eventLocker sync.RWMutex
+
+	head    *BufferPosition
+	current *BufferPosition
+}
+
+type BufferPosition struct {
+	// element of the event list
+	element *list.Element
+	// bufIndex the buffer index of the element
+	bufIndex int
+}
+
+func (p *BufferPosition) String() string {
+	buffer := p.element.Value.(SocketDataBuffer)
+	return fmt.Sprintf("data id: %d, sequence: %d, buffer index: %d",
+		buffer.DataID(), buffer.DataSequence(), p.bufIndex)
+}
+
+func newBuffer() *Buffer {
+	return &Buffer{
+		events:    list.New(),
+		validated: false,
+	}
+}
+
+func (r *Buffer) Position() *BufferPosition {
+	return r.current.Clone()
+}
+
+func (r *Buffer) Slice(validated bool, start, end *BufferPosition) *Buffer {
+	events := list.New()
+	for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() {
+		events.PushBack(nextElement.Value)
+	}
+	events.PushBack(&SocketDataEventLimited{end.element.Value.(SocketDataBuffer), 0, end.bufIndex})
+
+	return &Buffer{
+		events:    events,
+		validated: validated,
+		head:      &BufferPosition{element: events.Front(), bufIndex: start.bufIndex},
+		current:   &BufferPosition{element: events.Front(), bufIndex: start.bufIndex},
+	}
+}
+
+func (r *Buffer) Len() int {
+	if r.head == nil {
+		return 0
+	}
+	var result int
+	var startIndex = r.head.bufIndex
+	for e := r.head.element; e != nil; e = e.Next() {
+		result += r.head.element.Value.(SocketDataBuffer).BufferLen() - startIndex
+		startIndex = 0
+	}
+	return result
+}
+
+func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
+	if r.events.Len() == 0 {
+		return nil
+	}
+	return r.events.Front().Value.(SocketDataBuffer)
+}
+
+func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
+	if r.events.Len() == 0 {
+		return nil
+	}
+	return r.events.Back().Value.(SocketDataBuffer)
+}
+
+// DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count
+func (r *Buffer) DetectNotSendingLastPosition() *BufferPosition {
+	if r.events.Len() == 0 {
+		return nil
+	}
+
+	for e := r.events.Front(); e != nil; e = e.Next() {
+		buf := e.Value.(SocketDataBuffer)
+		// the buffer is sent finished but still have reduced data not send
+		if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
+			return &BufferPosition{element: e, bufIndex: buf.BufferLen()}
+		}
+	}
+	return nil
+}
+
+func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+	if len(buffers) == 0 {
+		return nil
+	}
+	if len(buffers) == 1 {
+		return buffers[0]
+	}
+	events := list.New()
+	for _, b := range buffers {
+		if b.head.bufIndex > 0 {
+			headBuffer := b.events.Front().Value.(SocketDataBuffer)
+			events.PushBack(&SocketDataEventLimited{headBuffer, b.head.bufIndex, headBuffer.BufferLen()})
+			for next := b.events.Front().Next(); next != nil; next = next.Next() {
+				events.PushBack(next.Value)
+			}
+		} else {
+			events.PushBackList(b.events)
+		}
+	}
+	return &Buffer{
+		events:    events,
+		validated: validated,
+		head:      &BufferPosition{element: events.Front(), bufIndex: 0},
+		current:   &BufferPosition{element: events.Front(), bufIndex: 0},
+	}
+}
+
+func (r *Buffer) Peek(p []byte) (n int, err error) {
+	// save the index temporary
+	tmpPosition := r.current.Clone()
+	// restore the index
+	defer func() {
+		r.current = tmpPosition
+	}()
+	readIndex := 0
+	for readIndex < len(p) {
+		count, err := r.Read(p[readIndex:])
+		if err != nil {
+			return 0, err
+		}
+		readIndex += count
+	}
+	return readIndex, nil
+}
+
+func (r *Buffer) OffsetPosition(offset int) *BufferPosition {
+	var nextElement func(e *list.Element) *list.Element
+	if offset == 0 {
+		return r.current.Clone()
+	} else if offset > 0 {
+		nextElement = func(e *list.Element) *list.Element {
+			return e.Next()
+		}
+	} else {
+		nextElement = func(e *list.Element) *list.Element {
+			return e.Prev()
+		}
+	}
+
+	var curEle = r.current.element
+	var curIndex = r.current.bufIndex
+	for ; curEle != nil; curEle = nextElement(curEle) {
+		nextOffset := curIndex + offset
+		bufferLen := curEle.Value.(SocketDataBuffer).BufferLen()
+		if nextOffset >= 0 && nextOffset < bufferLen {
+			curIndex += offset
+			break
+		}
+
+		if offset > 0 {
+			offset -= bufferLen - curIndex
+			curIndex = 0
+		} else {
+			offset += curIndex
+			next := nextElement(curEle)
+			if next == nil {
+				curEle = next
+				break
+			}
+			curIndex = curEle.Value.(SocketDataBuffer).BufferLen()
+		}
+	}
+
+	if curEle == nil {
+		return nil
+	}
+	return &BufferPosition{element: curEle, bufIndex: curIndex}
+}
+
+func (r *Buffer) Read(p []byte) (n int, err error) {
+	if len(p) == 0 {
+		return 0, nil
+	}
+	if r.current == nil || r.current.element == nil {
+		return 0, io.EOF
+	}
+	element, n := r.readFromCurrent(p)
+	if n > 0 {
+		return n, nil
+	}
+
+	curEvent := element.Value.(SocketDataBuffer)
+	next := r.nextElement(element)
+	if next == nil {
+		return 0, io.EOF
+	}
+	nextEvent := next.Value.(SocketDataBuffer)
+
+	var shouldRead = false
+	if r.validated {
+		shouldRead = true
+		// same data id and sequence orders
+	} else if (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence()+1 == nextEvent.DataSequence()) ||
+		// cur event is finished and next event is start
+		(nextEvent.IsStart() && curEvent.IsFinished()) ||
+		// same data id and sequence but have difference buffer index
+		(curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence() == nextEvent.DataSequence() &&
+			r.current.bufIndex <= nextEvent.BufferStartPosition()) {
+		shouldRead = true
+	}
+
+	if !shouldRead {
+		return 0, ErrNotComplete
+	}
+
+	return r.read0(next, nextEvent, p)
+}
+
+func (r *Buffer) readFromCurrent(p []byte) (element *list.Element, n int) {
+	element = r.current.element
+	curEvent := element.Value.(SocketDataBuffer)
+	residueSize := curEvent.BufferLen() - r.current.bufIndex
+	if residueSize > 0 {
+		readLen := len(p)
+		if residueSize < readLen {
+			readLen = residueSize
+		}
+
+		n = copy(p, curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen])
+		r.current.bufIndex += n
+		return element, n
+	}
+	return element, 0
+}
+
+func (r *Buffer) read0(currentElement *list.Element, currentBuffer SocketDataBuffer, p []byte) (n int, err error) {
+	readLen := len(p)
+	if currentBuffer.BufferLen() < readLen {
+		readLen = currentBuffer.BufferLen()
+	}
+
+	copy(p, currentBuffer.BufferData()[:readLen])
+	r.current.element = currentElement
+	r.current.bufIndex = readLen
+	return readLen, nil
+}
+
+// IsCurrentPacketReadFinished means to validate the current reading package is reading finished
+func (r *Buffer) IsCurrentPacketReadFinished() bool {
+	return r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen()
+}
+
+func (r *Buffer) resetForLoopReading() {
+	r.head = nil
+	r.current = nil
+}
+
+func (r *Buffer) prepareForReading() bool {
+	if r.events.Len() == 0 {
+		return false
+	}
+	if r.head == nil || r.head.element == nil {
+		// read in the first element
+		r.eventLocker.RLock()
+		defer r.eventLocker.RUnlock()
+		r.head = &BufferPosition{element: r.events.Front(), bufIndex: 0}
+		r.current = r.head.Clone()
+	} else {
+		// make sure we can read from head
+		r.current = r.head.Clone()
+	}
+
+	return true
+}
+
+func (r *Buffer) removeReadElements() bool {
+	r.eventLocker.Lock()
+	defer r.eventLocker.Unlock()
+
+	// delete until to current position
+	next := r.head.element
+	for ; next != nil && next != r.current.element; next = r.removeElement0(next) {
+	}
+	if next != nil && next.Value.(SocketDataBuffer).BufferLen() == r.current.bufIndex {
+		// the last event already read finished, then delete it
+		r.head.element = r.removeElement0(next)
+		r.head.bufIndex = 0
+	} else if next != nil {
+		// keep using the latest element
+		r.head.element = next
+	} else {
+		return true
+	}
+	return false
+}
+
+// skipCurrentElement skip current element in reader, if return true means have read finished
+func (r *Buffer) skipCurrentElement() bool {
+	r.head.element = r.nextElement(r.current.element)
+	r.current.bufIndex = 0
+
+	return r.head.element == nil
+}
+
+func (r *Buffer) removeElement0(element *list.Element) *list.Element {
+	if element == nil {
+		return nil
+	}
+	result := element.Next()
+	r.events.Remove(element)
+	return result
+}
+
+// appendEvent insert the event to the event list following the order
+func (r *Buffer) appendEvent(event *SocketDataUploadEvent) {
+	r.eventLocker.Lock()
+	defer r.eventLocker.Unlock()
+
+	if r.events.Len() == 0 {
+		r.events.PushFront(event)
+		return
+	}
+	if r.events.Back().Value.(SocketDataBuffer).DataID() < event.DataID() {
+		r.events.PushBack(event)
+		return
+	}
+	beenAdded := false
+	for element := r.events.Front(); element != nil; element = element.Next() {
+		existEvent := element.Value.(SocketDataBuffer)
+		if existEvent.DataID() > event.DataID() {
+			// data id needs order
+			beenAdded = true
+		} else if existEvent.DataID() == event.DataID() && existEvent.DataSequence() > event.DataSequence() {
+			// following the sequence order
+			beenAdded = true
+		}
+		if beenAdded {
+			r.events.InsertBefore(event, element)
+			break
+		}
+	}
+	if !beenAdded {
+		r.events.PushBack(event)
+	}
+}
+
+func (r *Buffer) deleteExpireEvents(expireDuration time.Duration) int {
+	r.eventLocker.Lock()
+	defer r.eventLocker.Unlock()
+
+	expireTime := time.Now().Add(-expireDuration)
+	count := 0
+	for e := r.events.Front(); e != nil; {
+		startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+		if expireTime.After(startTime) {
+			count++
+			cur := e
+			e = e.Next()
+			r.events.Remove(cur)
+		} else {
+			break
+		}
+	}
+	return count
+}
+
+func (r *Buffer) nextElement(e *list.Element) *list.Element {
+	if e == nil {
+		return nil
+	}
+	r.eventLocker.RLock()
+	defer r.eventLocker.RUnlock()
+	return e.Next()
+}
+
+func (p *BufferPosition) Clone() *BufferPosition {
+	return &BufferPosition{element: p.element, bufIndex: p.bufIndex}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
new file mode 100644
index 0000000..87984b2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"container/list"
+	"testing"
+)
+
+func TestOffsetPosition(t *testing.T) {
+	type position struct {
+		eventIndex  int
+		bufferIndex int
+	}
+	var tests = []struct {
+		events  []int
+		current position
+		offset  int
+		result  *position
+	}{
+		{
+			events:  []int{10, 10, 10},
+			current: position{0, 0},
+			offset:  10,
+			result:  &position{1, 0},
+		},
+		{
+			events:  []int{10, 10, 10},
+			current: position{1, 0},
+			offset:  -10,
+			result:  &position{0, 0},
+		},
+		{
+			events:  []int{10, 10, 10},
+			current: position{2, 5},
+			offset:  -10,
+			result:  &position{1, 5},
+		},
+		{
+			events:  []int{10, 10, 10},
+			current: position{2, 5},
+			offset:  -20,
+			result:  &position{0, 5},
+		},
+		{
+			events:  []int{10, 10, 10},
+			current: position{2, 5},
+			offset:  10,
+			result:  nil,
+		},
+	}
+
+	for _, test := range tests {
+		events := list.New()
+		buffer := Buffer{events: events}
+		var curElement *list.Element
+		for i, e := range test.events {
+			element := events.PushBack(&SocketDataUploadEvent{
+				DataID0: uint64(i),
+				DataLen: uint16(e),
+			})
+			if i == test.current.eventIndex {
+				curElement = element
+			}
+		}
+
+		buffer.prepareForReading()
+		buffer.current = &BufferPosition{element: curElement, bufIndex: test.current.bufferIndex}
+		offsetPosition := buffer.OffsetPosition(test.offset)
+		if offsetPosition == nil && test.result == nil {
+			continue
+		}
+		if int(offsetPosition.element.Value.(*SocketDataUploadEvent).DataID()) != test.result.eventIndex ||
+			offsetPosition.bufIndex != test.result.bufferIndex {
+			t.Fatalf("excepted: %d,%d, actual: %d,%d", test.result.eventIndex, test.result.bufferIndex,
+				offsetPosition.element.Value.(*SocketDataUploadEvent).DataID(), offsetPosition.bufIndex)
+		}
+	}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index e3f09f0..52f886d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,35 +24,45 @@ import (
 )
 
 type SocketDataBuffer interface {
-	// Combine from other buffer
-	Combine(buffered SocketDataBuffer) SocketDataBuffer
 	// BufferData of the buffer
 	BufferData() []byte
 	// TotalSize of socket data, the data may exceed the size of the BufferData()
 	TotalSize() uint64
 	// Direction of the data, send or receive
 	Direction() base.SocketDataDirection
-	FirstEvent() *SocketDataUploadEvent
-
+	// BufferStartPosition the buffer start index
+	BufferStartPosition() int
+	// BufferLen the buffer data length
+	BufferLen() int
+	// DataID data id of the buffer
+	DataID() uint64
+	// DataSequence the data sequence under same data id
+	DataSequence() int
+	// IsStart this buffer is start of the same data id
+	IsStart() bool
+	// IsFinished this buffer is finish of the same data id
+	IsFinished() bool
+	// HaveReduceDataAfterChunk check have reduced data after current buffer
+	HaveReduceDataAfterChunk() bool
+
+	// StartTime the data start timestamp
 	StartTime() uint64
+	// EndTime the data end timestamp
 	EndTime() uint64
-
-	MinDataID() int
-	MaxDataID() int
 }
 
 type SocketDataUploadEvent struct {
 	Protocol     base.ConnectionProtocol
-	MsgType      base.SocketMessageType
+	HaveReduce   uint8
 	Direction0   base.SocketDataDirection
 	Finished     uint8
-	Sequence     uint16
+	Sequence0    uint16
 	DataLen      uint16
 	StartTime0   uint64
 	EndTime0     uint64
 	ConnectionID uint64
 	RandomID     uint64
-	DataID       uint64
+	DataID0      uint64
 	TotalSize0   uint64
 	Buffer       [2048]byte
 }
@@ -65,6 +75,10 @@ func (s *SocketDataUploadEvent) BufferData() []byte {
 	return s.Buffer[:s.DataLen]
 }
 
+func (s *SocketDataUploadEvent) BufferLen() int {
+	return int(s.DataLen)
+}
+
 func (s *SocketDataUploadEvent) StartTime() uint64 {
 	return s.StartTime0
 }
@@ -77,90 +91,48 @@ func (s *SocketDataUploadEvent) Direction() base.SocketDataDirection {
 	return s.Direction0
 }
 
-func (s *SocketDataUploadEvent) FirstEvent() *SocketDataUploadEvent {
-	return s
-}
-
-func (s *SocketDataUploadEvent) MinDataID() int {
-	return int(s.DataID)
-}
-
-func (s *SocketDataUploadEvent) MaxDataID() int {
-	return int(s.DataID)
-}
-
 func (s *SocketDataUploadEvent) IsStart() bool {
-	return s.Sequence == 0
+	return s.Sequence0 == 0
 }
 
 func (s *SocketDataUploadEvent) IsFinished() bool {
 	return s.Finished == 1
 }
 
-func (s *SocketDataUploadEvent) Combine(other SocketDataBuffer) SocketDataBuffer {
-	combined := &SocketDataUploadCombinedEvent{first: s}
-	combined.realBuffer = append(s.Buffer[:s.DataLen], other.BufferData()...)
-	combined.minDataID = int(s.DataID)
-	if other.MinDataID() < combined.minDataID {
-		combined.minDataID = other.MinDataID()
-	}
-	combined.maxDataID = int(s.DataID)
-	if other.MaxDataID() > combined.maxDataID {
-		combined.maxDataID = other.MaxDataID()
-	}
-	return combined
+func (s *SocketDataUploadEvent) DataID() uint64 {
+	return s.DataID0
 }
 
-func (s *SocketDataUploadEvent) TotalSize() uint64 {
-	return s.TotalSize0
-}
-
-type SocketDataUploadCombinedEvent struct {
-	first      *SocketDataUploadEvent
-	realBuffer []byte
-	minDataID  int
-	maxDataID  int
-}
-
-func (s *SocketDataUploadCombinedEvent) BufferData() []byte {
-	return s.realBuffer
-}
-
-func (s *SocketDataUploadCombinedEvent) TotalSize() uint64 {
-	return s.first.TotalSize0
+func (s *SocketDataUploadEvent) DataSequence() int {
+	return int(s.Sequence0)
 }
 
-func (s *SocketDataUploadCombinedEvent) StartTime() uint64 {
-	return s.first.StartTime0
+func (s *SocketDataUploadEvent) BufferStartPosition() int {
+	return 0
 }
 
-func (s *SocketDataUploadCombinedEvent) EndTime() uint64 {
-	return s.first.EndTime0
+func (s *SocketDataUploadEvent) TotalSize() uint64 {
+	return s.TotalSize0
 }
 
-func (s *SocketDataUploadCombinedEvent) MinDataID() int {
-	return s.minDataID
+func (s *SocketDataUploadEvent) HaveReduceDataAfterChunk() bool {
+	return s.HaveReduce == 1
 }
 
-func (s *SocketDataUploadCombinedEvent) MaxDataID() int {
-	return s.maxDataID
+type SocketDataEventLimited struct {
+	SocketDataBuffer
+	from int
+	size int
 }
 
-func (s *SocketDataUploadCombinedEvent) Direction() base.SocketDataDirection {
-	return s.first.Direction0
+func (s *SocketDataEventLimited) BufferData() []byte {
+	return s.SocketDataBuffer.BufferData()[s.from:s.size]
 }
 
-func (s *SocketDataUploadCombinedEvent) FirstEvent() *SocketDataUploadEvent {
-	return s.first
+func (s *SocketDataEventLimited) BufferLen() int {
+	return s.size - s.from
 }
 
-func (s *SocketDataUploadCombinedEvent) Combine(other SocketDataBuffer) SocketDataBuffer {
-	s.realBuffer = append(s.realBuffer, other.BufferData()...)
-	if other.MinDataID() < s.minDataID {
-		s.minDataID = other.MinDataID()
-	}
-	if other.MaxDataID() > s.maxDataID {
-		s.maxDataID = other.MaxDataID()
-	}
-	return s
+func (s *SocketDataEventLimited) BufferStartPosition() int {
+	return s.from
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 8afb92e..af401e1 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -18,22 +18,32 @@
 package base
 
 import (
+	"time"
+
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 )
 
+type ParseResult int
+
+const (
+	ParseResultSuccess ParseResult = iota
+	ParseResultSkipPackage
+)
+
 type Protocol interface {
-	Name() string
+	Protocol() base.ConnectionProtocol
 	GenerateMetrics() Metrics
 	Init(config *profiling.TaskConfig)
 
-	ReceiveData(context Context, event *SocketDataUploadEvent) bool
+	ParseProtocol(connectionID uint64, metrics Metrics, reader *Buffer) ParseResult
+	PackageMaxExpireDuration() time.Duration
 	UpdateExtensionConfig(config *profiling.ExtensionConfig)
 }
 
 type Context interface {
 	QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
-	QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) Metrics
+	QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocol base.ConnectionProtocol) Metrics
 }
 
 type Metrics interface {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 499f803..c9b45fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -18,30 +18,22 @@
 package http1
 
 import (
-	"bufio"
-	"bytes"
 	"container/list"
 	"encoding/json"
-	"fmt"
-	"io"
-	"net/http"
-	"net/textproto"
-	"strconv"
-	"strings"
 	"sync"
+	"time"
 
 	"github.com/apache/skywalking-rover/pkg/logger"
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
 
 	"github.com/sirupsen/logrus"
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1")
 
-var ProtocolName = "http1"
-
 var PackageSizeHistogramBuckets = []float64{
 	// 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
 	256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
@@ -77,8 +69,8 @@ func NewHTTP1Analyzer() protocol.Protocol {
 	}
 }
 
-func (h *Analyzer) Name() string {
-	return ProtocolName
+func (h *Analyzer) Protocol() base.ConnectionProtocol {
+	return base.ConnectionProtocolHTTP
 }
 
 func (h *Analyzer) GenerateMetrics() protocol.Metrics {
@@ -94,260 +86,146 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
 	h.sampleConfig = NewSamplingConfig(config)
 }
 
-func (h *Analyzer) ReceiveData(context protocol.Context, event *protocol.SocketDataUploadEvent) bool {
-	// only handle the HTTP1 protocol
-	if event.Protocol != base.ConnectionProtocolHTTP {
-		return false
-	}
-
-	connectionID := event.GenerateConnectionID()
-	fromAnalyzerCache := false
-	var connectionMetrics *ConnectionMetrics
-	connection := context.QueryConnection(event.ConnectionID, event.RandomID)
-	// if connection not exists, then cached it into the analyzer context
-	if connection == nil {
-		connectionMetrics = h.cache[connectionID]
-		fromAnalyzerCache = true
-		if connectionMetrics == nil {
-			connectionMetrics = h.GenerateMetrics().(*ConnectionMetrics)
-			h.cache[connectionID] = connectionMetrics
-		}
-	} else {
-		connectionMetrics = context.QueryProtocolMetrics(connection.Metrics, ProtocolName).(*ConnectionMetrics)
-	}
-
-	log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
-		connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
-	// if the cache is existing in the analyzer context, then delete it
-	if !fromAnalyzerCache {
-		if tmp := h.cache[connectionID]; tmp != nil {
-			connectionMetrics.MergeFrom(h, tmp)
-			delete(h.cache, connectionID)
-		}
+func (h *Analyzer) ParseProtocol(connectionID uint64, metrics protocol.Metrics, buf *protocol.Buffer) protocol.ParseResult {
+	connectionMetrics := metrics.(*ConnectionMetrics)
+	messageType, err := reader.IdentityMessageType(buf)
+	if err != nil {
+		return protocol.ParseResultSkipPackage
 	}
 
-	req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
-	if req != nil && resp != nil {
-		if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
-			log.Errorf("HTTP1 analyze failure: %v", err)
-			return false
-		}
-	} else {
-		log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
+	var result protocol.ParseResult
+	switch messageType {
+	case reader.MessageTypeRequest:
+		result, err = h.handleRequest(connectionMetrics, buf)
+	case reader.MessageTypeResponse:
+		result, err = h.handleResponse(connectionID, connectionMetrics, buf)
+	case reader.MessageTypeUnknown:
+		return protocol.ParseResultSkipPackage
 	}
-	return true
-}
 
-func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
-	if config == nil {
-		return
+	if err != nil {
+		log.Warnf("reading %v error: %v", messageType, err)
+		return protocol.ParseResultSkipPackage
+	} else if result != protocol.ParseResultSuccess {
+		return result
 	}
-	h.sampleConfig.UpdateRules(config.NetworkSamplings)
+	return protocol.ParseResultSuccess
 }
 
-func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
-	lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
-	firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
-	if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
-		halfConnections.Remove(firstElement)
-		return h.combineEventIfNeed(firstEvent, lastAppender)
+func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf *protocol.Buffer) (protocol.ParseResult, error) {
+	// parsing request
+	req, r, err := reader.ReadRequest(buf)
+	if err != nil {
+		return protocol.ParseResultSkipPackage, err
 	}
-	next := firstElement.Next()
-	halfConnections.Remove(firstElement)
-	var buffer protocol.SocketDataBuffer = firstEvent
-	// for-each the events until buffer finished
-	for next != nil {
-		event := next.Value.(*protocol.SocketDataUploadEvent)
-
-		buffer = buffer.Combine(event)
-
-		tmp := next.Next()
-		halfConnections.Remove(next)
-		next = tmp
-		// combine event
-		if event.Finished == 1 {
-			return h.combineEventIfNeed(buffer, lastAppender)
-		}
+	if r != protocol.ParseResultSuccess {
+		return r, nil
 	}
-	return h.combineEventIfNeed(buffer, lastAppender)
-}
 
-func (h *Analyzer) combineEventIfNeed(data, appender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
-	if appender != nil {
-		return data.Combine(appender)
-	}
-	return data
+	metrics.AppendRequestToList(req)
+	return protocol.ParseResultSuccess, nil
 }
 
-func (h *Analyzer) buildHTTP1(halfConnections *list.List, event *protocol.SocketDataUploadEvent) (request, response protocol.SocketDataBuffer) {
-	// no connections, then just add the response to the half connections to wait the request
-	if halfConnections.Len() == 0 {
-		halfConnections.PushBack(event)
-		return nil, nil
+func (h *Analyzer) handleResponse(connectionID uint64, metrics *ConnectionMetrics, buf *protocol.Buffer) (protocol.ParseResult, error) {
+	// find the first request
+	firstElement := metrics.halfData.Front()
+	if firstElement == nil {
+		return protocol.ParseResultSkipPackage, nil
 	}
+	request := metrics.halfData.Remove(firstElement).(*reader.Request)
 
-	// quick handler(only one element, and is request)
-	if halfConnections.Len() == 1 {
-		firstElement := halfConnections.Front()
-		firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
-		if firstEvent.IsStart() && firstEvent.Finished == 1 && event.IsStart() && event.Finished == 1 &&
-			firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
-			event.MsgType == base.SocketMessageTypeResponse {
-			return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
-		}
-	}
-
-	// push to the queue
-	h.insertToList(halfConnections, event)
-
-	// trying to find completed request and response
-	return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
-}
-
-func (h *Analyzer) insertToList(halfConnections *list.List, event *protocol.SocketDataUploadEvent) {
-	if halfConnections.Len() == 0 {
-		halfConnections.PushFront(event)
-		return
-	}
-	if halfConnections.Back().Value.(*protocol.SocketDataUploadEvent).DataID < event.DataID {
-		halfConnections.PushBack(event)
-		return
-	}
-	beenAdded := false
-	for element := halfConnections.Front(); element != nil; element = element.Next() {
-		existEvent := element.Value.(*protocol.SocketDataUploadEvent)
-		if existEvent.DataID > event.DataID {
-			// data id needs order
-			beenAdded = true
-		} else if existEvent.DataID == event.DataID {
-			if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
-				// same message type and following the sequence order
-				beenAdded = true
-			} else if existEvent.MsgType > event.MsgType {
-				// request needs before response
-				beenAdded = true
-			}
-		}
-		if beenAdded {
-			halfConnections.InsertBefore(event, element)
-			break
-		}
-	}
-	if !beenAdded {
-		halfConnections.PushBack(event)
-	}
-}
-
-func (h *Analyzer) analyze(_ protocol.Context, connectionID string, connectionMetrics *ConnectionMetrics,
-	requestBuffer, responseBuffer protocol.SocketDataBuffer) error {
-	request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
-	if err != nil {
-		return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
-			len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
-	}
-
-	response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
-	if response != nil {
-		defer response.Body.Close()
-	}
+	// parsing request
+	response, r, err := reader.ReadResponse(request, buf)
 	if err != nil {
-		if err == io.ErrUnexpectedEOF || err == io.EOF {
-			response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
-			if err != nil {
-				return fmt.Errorf("parsing simple data error: %v", err)
-			}
-			if response != nil && response.Body != nil {
-				defer response.Body.Close()
-			}
-		}
-		if err != nil {
-			return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
-				len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
-		}
+		return protocol.ParseResultSkipPackage, err
+	} else if r != protocol.ParseResultSuccess {
+		return r, nil
 	}
 
 	// lock append metrics with read locker
-	connectionMetrics.metricsLocker.RLock()
-	defer connectionMetrics.metricsLocker.RUnlock()
+	metrics.metricsLocker.RLock()
+	defer metrics.metricsLocker.RUnlock()
 
 	// append metrics
-	data := connectionMetrics.clientMetrics
+	data := metrics.clientMetrics
 	side := base.ConnectionRoleClient
-	if requestBuffer.Direction() == base.SocketDataDirectionIngress {
+	if request.Direction() == base.SocketDataDirectionIngress {
 		// if receive the request, that's mean is server side
-		data = connectionMetrics.serverMetrics
+		data = metrics.serverMetrics
 		side = base.ConnectionRoleServer
 	}
-	data.Append(h.sampleConfig, request, requestBuffer, response, responseBuffer)
+	data.Append(h.sampleConfig, request, response)
 
 	if log.Enable(logrus.DebugLevel) {
 		metricsJSON, _ := json.Marshal(data)
-		log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
+		log.Debugf("generated metrics, connection id: %d, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
 	}
-	return nil
+	return protocol.ParseResultSuccess, nil
 }
 
-func (h *Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
-	if reader.Size() < 16 {
-		return nil, fmt.Errorf("the header length not enough")
-	}
-	tp := textproto.NewReader(reader)
-	resp := &http.Response{
-		Request: request,
+func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+	if config == nil {
+		return
 	}
+	h.sampleConfig.UpdateRules(config.NetworkSamplings)
+}
 
-	line, err := tp.ReadLine()
-	if err != nil {
-		return nil, fmt.Errorf("read response first line failure: %v", err)
-	}
-	indexByte := strings.IndexByte(line, ' ')
-	if indexByte == -1 {
-		return nil, fmt.Errorf("parsing response error: %s", line)
-	}
-	resp.Proto = line[:indexByte]
-	resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
-	statusCode := resp.Status
-	if i := strings.IndexByte(resp.Status, ' '); i != -1 {
-		statusCode = resp.Status[:i]
+func (h *Analyzer) PackageMaxExpireDuration() time.Duration {
+	return time.Minute
+}
+
+func (m *ConnectionMetrics) AppendRequestToList(req *reader.Request) {
+	if m.halfData.Len() == 0 {
+		m.halfData.PushFront(req)
+		return
 	}
-	if len(statusCode) != 3 {
-		return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
+	if m.halfData.Back().Value.(*reader.Request).MinDataID() < req.MinDataID() {
+		m.halfData.PushBack(req)
+		return
 	}
-	resp.StatusCode, err = strconv.Atoi(statusCode)
-	if err != nil || resp.StatusCode < 0 {
-		return nil, fmt.Errorf("status code not correct: %s", statusCode)
+	beenAdded := false
+	for element := m.halfData.Front(); element != nil; element = element.Next() {
+		existEvent := element.Value.(*reader.Request)
+		if existEvent.MinDataID() > req.MinDataID() {
+			m.halfData.InsertBefore(req, element)
+			beenAdded = true
+			break
+		}
 	}
-	var ok bool
-	if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
-		return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+	if !beenAdded {
+		m.halfData.PushBack(req)
 	}
-
-	return resp, nil
 }
 
-func (h *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+func (m *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
 	other := data.(*ConnectionMetrics)
 	other.metricsLocker.Lock()
 	defer other.metricsLocker.Unlock()
 
-	h.clientMetrics.MergeAndClean(other.clientMetrics)
-	h.serverMetrics.MergeAndClean(other.serverMetrics)
+	if other.halfData != nil {
+		for element := other.halfData.Front(); element != nil; element = element.Next() {
+			m.AppendRequestToList(element.Value.(*reader.Request))
+		}
+	}
+
+	m.clientMetrics.MergeAndClean(other.clientMetrics)
+	m.serverMetrics.MergeAndClean(other.serverMetrics)
 	if log.Enable(logrus.DebugLevel) {
-		clientMetrics, _ := json.Marshal(h.clientMetrics)
-		serverMetrics, _ := json.Marshal(h.serverMetrics)
+		clientMetrics, _ := json.Marshal(m.clientMetrics)
+		serverMetrics, _ := json.Marshal(m.serverMetrics)
 		log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
 			connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
 	}
 }
 
-func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+func (m *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
 	for _, p := range traffic.LocalProcesses {
 		// if the remote process is profiling, then used the client side
-		localMetrics := h.clientMetrics
-		remoteMetrics := h.serverMetrics
+		localMetrics := m.clientMetrics
+		remoteMetrics := m.serverMetrics
 		if traffic.Role == base.ConnectionRoleServer {
-			localMetrics = h.serverMetrics
-			remoteMetrics = h.clientMetrics
+			localMetrics = m.serverMetrics
+			remoteMetrics = m.clientMetrics
 		}
 
 		metricsCount := localMetrics.appendMetrics(traffic, p, "", metricsBuilder, false)
@@ -362,15 +240,7 @@ func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBu
 			// if remote process is profiling, then the metrics data need to be cut half
 			log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)",
 				traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
-				h.clientMetrics.String(), h.serverMetrics.String())
-		}
-	}
-}
-
-func (h *ConnectionMetrics) MergeFrom(analyzer *Analyzer, other *ConnectionMetrics) {
-	if other.halfData != nil {
-		for element := other.halfData.Front(); element != nil; element = element.Next() {
-			analyzer.insertToList(h.halfData, element.Value.(*protocol.SocketDataUploadEvent))
+				m.clientMetrics.String(), m.serverMetrics.String())
 		}
 	}
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
deleted file mode 100644
index a663a6e..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
-	"bufio"
-	"container/list"
-	"net/http"
-	"reflect"
-	"strings"
-	"testing"
-
-	base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
-	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-var unknown, request, response = 0, 1, 2
-var finished, notFinished = 1, 0
-
-// nolint
-func TestBuildHTTP1(t *testing.T) {
-	tests := []struct {
-		name   string
-		events []struct {
-			dataID   int
-			dataType int
-			sequence int
-			finished int
-			data     string
-		}
-		http []struct {
-			start int
-			end   int
-		}
-		residueID []int
-	}{
-		{
-			name: "simple",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{1, request, 0, notFinished, ""},
-				{1, request, 1, notFinished, ""},
-				{1, request, 2, finished, ""},
-				{2, response, 0, notFinished, ""},
-				{2, response, 1, finished, ""},
-
-				{3, request, 0, finished, ""},
-				{4, response, 0, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{
-				{1, 2},
-				{3, 4},
-			},
-			residueID: []int{},
-		},
-		{
-			name: "response before request",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{2, response, 0, finished, ""},
-				{1, request, 1, finished, ""},
-				{1, request, 0, notFinished, ""},
-
-				{3, request, 0, notFinished, ""},
-				{3, request, 1, notFinished, ""},
-				{4, response, 1, finished, ""},
-				{4, response, 0, notFinished, ""},
-				{3, request, 2, finished, ""},
-
-				{5, request, 0, notFinished, ""},
-				{5, request, 1, notFinished, ""},
-				{6, response, 1, finished, ""},
-				{6, response, 0, notFinished, ""},
-				{5, request, 2, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{
-				{1, 2},
-				{3, 4},
-				{5, 6},
-			},
-			residueID: []int{},
-		},
-		{
-			name: "residue requests",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{1, request, 0, finished, ""},
-				{2, response, 1, finished, ""},
-				{2, response, 0, notFinished, ""},
-
-				{3, request, 0, finished, ""},
-				{4, response, 0, notFinished, ""},
-
-				{5, request, 1, finished, ""},
-				{6, response, 0, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{
-				{1, 2},
-			},
-			residueID: []int{3, 4, 5, 6},
-		},
-		{
-			name: "multiple request",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{1, request, 0, finished, ""},
-				{2, request, 0, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{},
-			residueID: []int{1, 2},
-		},
-		{
-			name: "multiple response",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{1, request, 0, finished, ""},
-				{3, response, 1, finished, ""},
-				{4, response, 0, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{},
-			residueID: []int{1, 3, 4},
-		},
-		{
-			name: "unfinished response",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{42, response, 0, notFinished, ""},
-				{42, response, 1, notFinished, ""},
-				{42, response, 2, finished, ""},
-
-				{48, request, 0, finished, ""},
-				{50, response, 0, notFinished, ""},
-				{50, response, 1, notFinished, ""},
-				{50, response, 2, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{
-				{48, 50},
-			},
-			residueID: []int{42, 42, 42},
-		},
-		{
-			name: "unknown data",
-			events: []struct {
-				dataID   int
-				dataType int
-				sequence int
-				finished int
-				data     string
-			}{
-				{1, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
-				{1, unknown, 1, notFinished, "Host: test.com\n\r\n"},
-				{2, response, 0, finished, ""},
-
-				{3, unknown, 1, notFinished, "Host: test.com\n\r\n"},
-				{4, response, 0, finished, ""},
-				{3, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
-
-				{6, unknown, 1, notFinished, "Host: test.com\n\r\n"},
-				{5, request, 0, finished, ""},
-				{6, unknown, 0, notFinished, "HTTP/1.1 200 OK\r\n"},
-
-				// request not finished
-				{7, unknown, 1, notFinished, "Host: test.com\n"},
-				{8, response, 0, finished, ""},
-				{7, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
-				{9, request, 0, finished, ""},
-				{10, response, 0, finished, ""},
-			},
-			http: []struct {
-				start int
-				end   int
-			}{
-				{1, 2},
-				{3, 4},
-				{5, 6},
-				{9, 10},
-			},
-			residueID: []int{7, 7, 8},
-		},
-	}
-
-	for _, testCase := range tests {
-		//t.Run(testCase.name, func(t *testing.T) {
-		analyzer := NewHTTP1Analyzer().(*Analyzer)
-		l := list.New()
-		var events = make([]struct {
-			start, end int
-		}, 0)
-		for _, event := range testCase.events {
-			req, resp := analyzer.buildHTTP1(l, &base2.SocketDataUploadEvent{
-				DataID:   uint64(event.dataID),
-				MsgType:  base.SocketMessageType(event.dataType),
-				Sequence: uint16(event.sequence),
-				Finished: uint8(event.finished),
-				Buffer:   bufferConvert(event.data),
-				DataLen:  uint16(len(event.data)),
-			})
-			if req != nil && resp != nil {
-				events = append(events, struct{ start, end int }{start: req.MinDataID(), end: resp.MaxDataID()})
-			}
-		}
-
-		if !reflect.DeepEqual(testCase.http, events) {
-			t.Fatalf("excepted http: %v, actual: %v", testCase.http, events)
-		}
-
-		exceptedList := testCase.residueID
-		if exceptedList == nil {
-			exceptedList = make([]int, 0)
-		}
-		actualList := make([]int, 0)
-		for element := l.Front(); element != nil; element = element.Next() {
-			actualList = append(actualList, int(element.Value.(*base2.SocketDataUploadEvent).DataID))
-		}
-		if !reflect.DeepEqual(exceptedList, actualList) {
-			t.Fatalf("excepted residue data list: %v, actual: %v", exceptedList, actualList)
-		}
-		//})
-	}
-}
-
-var defaultBuffer [2048]byte
-
-func bufferConvert(data string) [2048]byte {
-	if data == "" {
-		return defaultBuffer
-	}
-	var buffer [2048]byte
-	for inx, d := range []byte(data) {
-		buffer[inx] = d
-	}
-	return buffer
-}
-
-func TestParseSimpleHTTP1Response(t *testing.T) {
-	s := `HTTP/1.0 200 OK\r\n`
-	h := &http.Request{}
-	analyzer := NewHTTP1Analyzer().(*Analyzer)
-	resp, err := analyzer.tryingToReadResponseWithoutHeaders(bufio.NewReader(strings.NewReader(s)), h)
-	if err != nil {
-		t.Fatalf("reading simple response error: %v", err)
-	}
-	if resp.Body != nil {
-		defer resp.Body.Close()
-	}
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
deleted file mode 100644
index a55ae5f..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
+++ /dev/null
@@ -1,290 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
-	"bufio"
-	"bytes"
-	"container/list"
-	"net/http"
-
-	base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
-	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-type BufferAnalyzer struct {
-	http1Analyzer *Analyzer
-
-	unknownEventBuffer base2.SocketDataBuffer
-	unknownElement     *list.Element
-	unknownSize        int
-	request            *base2.SocketDataUploadEvent
-	requestElement     *list.Element
-	response           *base2.SocketDataUploadEvent
-	responseElement    *list.Element
-
-	unknownDataID      uint64
-	unknownMaxSequence uint16
-	reqDataID          uint64
-	reqMaxSequence     uint16
-	reqFinished        bool
-	respDataID         uint64
-	respMaxSequence    uint16
-	respFinished       bool
-}
-
-func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
-	return &BufferAnalyzer{http1Analyzer: http1}
-}
-
-func (h *BufferAnalyzer) Analyze(events *list.List) (request, response base2.SocketDataBuffer) {
-	for element := events.Front(); element != nil; element = element.Next() {
-		curEvent := element.Value.(*base2.SocketDataUploadEvent)
-		// transform the unknown to the request or response
-		if continueReading, req, resp := h.handleUnknown(events, element, curEvent); req != nil && resp != nil {
-			return req, resp
-		} else if continueReading {
-			continue
-		}
-
-		if continueReading, req, resp := h.handleRequest(events, element, curEvent); req != nil && resp != nil {
-			return req, resp
-		} else if continueReading {
-			continue
-		}
-
-		if req, resp := h.handleResponse(events, element, curEvent); req != nil && resp != nil {
-			return req, resp
-		}
-	}
-	return nil, nil
-}
-
-func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
-	curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
-	if curEvent.MsgType != base.SocketMessageTypeUnknown {
-		return false, nil, nil
-	}
-	if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
-		// maybe the unknown type is response, so clean the context
-		if !curEvent.IsStart() {
-			h.cleanContext()
-			return true, nil, nil
-		}
-		h.resetStartUnknown(element, curEvent)
-		req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
-		if req != nil && resp != nil {
-			return false, req, resp
-		}
-		return true, nil, nil
-	}
-	if curEvent.MsgType == base.SocketMessageTypeUnknown {
-		if h.unknownDataID == curEvent.DataID && h.unknownMaxSequence+1 == curEvent.Sequence {
-			h.unknownEventBuffer = h.unknownEventBuffer.Combine(curEvent)
-			h.unknownMaxSequence++
-		} else if curEvent.IsStart() {
-			h.resetStartUnknown(element, curEvent)
-		} else {
-			h.cleanContext()
-		}
-
-		req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
-		if req != nil && resp != nil {
-			return false, req, resp
-		}
-		return true, nil, nil
-	}
-	return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
-	curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
-	if h.request == nil {
-		// find the first request package event
-		if curEvent.MsgType == base.SocketMessageTypeRequest && curEvent.IsStart() {
-			h.resetStartRequest(element, curEvent)
-		}
-		return true, nil, nil
-	}
-	if curEvent.MsgType == base.SocketMessageTypeRequest {
-		// if the request not finished and latest request sequence match with current event
-		// then keep the request tracing
-		if !h.reqFinished && h.reqDataID == curEvent.DataID && h.reqMaxSequence+1 == curEvent.Sequence {
-			h.reqMaxSequence++
-			h.reqFinished = curEvent.IsFinished()
-		} else if curEvent.IsStart() {
-			// if current event is new one, then update to current request
-			h.resetStartRequest(element, curEvent)
-		} else {
-			// Otherwise, clean the request and response context
-			h.cleanContext()
-		}
-
-		// if request and response all finished, then return
-		if h.reqFinished && h.respFinished {
-			req, resp = h.buildHTTP(events)
-			return false, req, resp
-		}
-
-		return true, nil, nil
-	}
-	return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleResponse(events *list.List, element *list.Element,
-	curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
-	if h.response == nil {
-		// if current response is not start, then clean to re-find new one
-		if !curEvent.IsStart() {
-			h.cleanContext()
-			return nil, nil
-		}
-		h.resetStartResponse(element, curEvent)
-		if h.reqFinished && h.respFinished {
-			return h.buildHTTP(events)
-		}
-		return nil, nil
-	}
-
-	// if a new response, then clean the re-find new one, wait the previous data
-	if curEvent.IsStart() {
-		h.cleanContext()
-		return nil, nil
-	}
-
-	// if response sequence is broken, then clean the context
-	if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence {
-		h.cleanContext()
-		return nil, nil
-	}
-	h.respDataID = curEvent.DataID
-	h.respMaxSequence = curEvent.Sequence
-
-	if h.reqFinished && curEvent.IsFinished() {
-		return h.buildHTTP(events)
-	}
-	return nil, nil
-}
-
-func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
-	h.unknownEventBuffer = curEvent
-	h.unknownElement = element
-	h.unknownDataID = curEvent.DataID
-	h.unknownMaxSequence = curEvent.Sequence
-}
-
-func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
-	h.request = curEvent
-	h.reqDataID = curEvent.DataID
-	h.reqMaxSequence = curEvent.Sequence
-	h.reqFinished = curEvent.IsFinished()
-	h.requestElement = element
-}
-
-func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
-	h.response = curEvent
-	h.respDataID = curEvent.DataID
-	h.respMaxSequence = curEvent.Sequence
-	h.responseElement = element
-	h.respFinished = curEvent.IsFinished()
-}
-
-func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
-	if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
-		return nil, nil
-	}
-	// length not enough
-	if len(h.unknownEventBuffer.BufferData()) < 16 {
-		return nil, nil
-	}
-	_, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())))
-	if err == nil {
-		// update the event as request
-		curEvent.Finished = 1
-		h.transformUnknown(h.unknownElement, base.SocketMessageTypeRequest)
-		// update the current data is request
-		h.resetStartRequest(h.unknownElement, h.unknownEventBuffer.FirstEvent())
-		h.reqFinished = true
-		h.cleanResponseContext()
-		h.cleanUnknownContext()
-		return nil, nil
-	}
-	tmpResponse, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())), &http.Request{})
-	if err == nil {
-		defer tmpResponse.Body.Close()
-		curEvent.Finished = 1
-		h.transformUnknown(h.unknownElement, base.SocketMessageTypeResponse)
-		// if request already finished, then remove the request
-		if h.reqFinished {
-			h.resetStartResponse(h.unknownElement, h.unknownEventBuffer.FirstEvent())
-			return h.buildHTTP(events)
-		}
-		// otherwise, clean the context and wait request
-		h.cleanContext()
-	}
-	return nil, nil
-}
-
-func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
-	// update message type and total size
-	firstEvent := element.Value.(*base2.SocketDataUploadEvent)
-	firstEvent.MsgType = msgType
-	dataLen := int(firstEvent.DataLen)
-	for e := element.Next(); e != nil; e = e.Next() {
-		curEvent := e.Value.(*base2.SocketDataUploadEvent)
-		if curEvent.Finished == 1 {
-			curEvent.MsgType = msgType
-			dataLen += int(curEvent.DataLen)
-			firstEvent.TotalSize0 = uint64(dataLen)
-			return
-		}
-		curEvent.MsgType = msgType
-		dataLen += int(curEvent.DataLen)
-	}
-}
-
-func (h *BufferAnalyzer) cleanContext() {
-	h.cleanUnknownContext()
-	h.cleanRequestContext()
-	h.cleanResponseContext()
-}
-
-func (h *BufferAnalyzer) cleanResponseContext() {
-	h.response = nil
-	h.respDataID = 0
-	h.respMaxSequence = 0
-	h.respFinished = false
-}
-
-func (h *BufferAnalyzer) cleanRequestContext() {
-	h.request = nil
-	h.reqDataID = 0
-	h.reqMaxSequence = 0
-	h.reqFinished = false
-}
-
-func (h *BufferAnalyzer) cleanUnknownContext() {
-	h.unknownEventBuffer, h.unknownElement = nil, nil
-	h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
-}
-
-func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp base2.SocketDataBuffer) {
-	return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, nil),
-		h.http1Analyzer.combineAndRemoveEvent(events, h.responseElement, nil)
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 5756206..9290d34 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -18,23 +18,16 @@
 package http1
 
 import (
-	"bufio"
-	"bytes"
-	"compress/gzip"
 	"encoding/json"
 	"fmt"
-	"io"
-	"mime"
-	"net/http"
 	"strings"
 	"time"
 
-	"golang.org/x/net/html/charset"
-
 	"github.com/apache/skywalking-rover/pkg/process/api"
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
 	"github.com/apache/skywalking-rover/pkg/tools"
 	"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -80,27 +73,28 @@ func NewHTTP1URIMetrics() *URIMetrics {
 	}
 }
 
-func (u *URIMetrics) Append(sampleConfig *SamplingConfig,
-	req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
+func (u *URIMetrics) Append(sampleConfig *SamplingConfig, req *reader.Request, resp *reader.Response) {
 	u.RequestCounter.Increase()
-	statusCounter := u.StatusCounter[resp.StatusCode]
+	statusCounter := u.StatusCounter[resp.StatusCode()]
 	if statusCounter == nil {
 		statusCounter = metrics.NewCounter()
-		u.StatusCounter[resp.StatusCode] = statusCounter
+		u.StatusCounter[resp.StatusCode()] = statusCounter
 	}
 	statusCounter.Increase()
 
-	u.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
-	u.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
-	u.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
-	u.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
+	requestTotalSize := req.ContentTotalSize()
+	responseTotalSize := resp.ContentTotalSize()
+	u.AvgRequestPackageSize.Increase(float64(requestTotalSize))
+	u.AvgResponsePackageSize.Increase(float64(responseTotalSize))
+	u.ReqPackageSizeHistogram.Increase(float64(requestTotalSize))
+	u.RespPackageSizeHistogram.Increase(float64(responseTotalSize))
 
-	duration := time.Duration(respBuffer.EndTime() - reqBuffer.StartTime())
+	duration := time.Duration(resp.EndTime() - req.StartTime())
 	durationInMS := float64(duration.Milliseconds())
 	u.avgDuration.Increase(durationInMS)
 	u.durationHistogram.Increase(durationInMS)
 
-	u.sampler.AppendMetrics(sampleConfig, duration, req, resp, reqBuffer, respBuffer)
+	u.sampler.AppendMetrics(sampleConfig, duration, req, resp)
 }
 
 func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
@@ -177,15 +171,13 @@ func (u *URIMetrics) String() string {
 }
 
 type Trace struct {
-	Trace          protocol.TracingContext
-	RequestURI     string
-	RequestBuffer  protocol.SocketDataBuffer
-	Request        *http.Request
-	ResponseBuffer protocol.SocketDataBuffer
-	Response       *http.Response
-	Type           string
-	Settings       *profiling.NetworkDataCollectingSettings
-	TaskConfig     *profiling.HTTPSamplingConfig
+	Trace      protocol.TracingContext
+	RequestURI string
+	Request    *reader.Request
+	Response   *reader.Response
+	Type       string
+	Settings   *profiling.NetworkDataCollectingSettings
+	TaskConfig *profiling.HTTPSamplingConfig
 }
 
 func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
@@ -212,7 +204,7 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
 		SSL:           traffic.IsSSL,
 		URI:           h.RequestURI,
 		Reason:        h.Type,
-		Status:        h.Response.StatusCode,
+		Status:        h.Response.StatusCode(),
 	}
 	if traffic.Role == base.ConnectionRoleClient {
 		body.ClientProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
@@ -238,35 +230,35 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
 func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
 	events := make([]*v3.SpanAttachedEvent, 0)
 	if h.Settings != nil && h.Settings.RequireCompleteRequest {
-		events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header,
-			h.Request.Body, h.RequestBuffer, h.Settings.MaxRequestSize)
+		events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.MessageOpt, h.TaskConfig.DefaultRequestEncoding,
+			h.Settings.MaxRequestSize)
 	}
 	if h.Settings != nil && h.Settings.RequireCompleteResponse {
-		events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header,
-			h.Response.Body, h.ResponseBuffer, h.Settings.MaxResponseSize)
+		events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.MessageOpt, h.TaskConfig.DefaultResponseEncoding,
+			h.Settings.MaxResponseSize)
 	}
 
 	metricsBuilder.AppendSpanAttachedEvents(events)
 }
 
 func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
-	tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) []*v3.SpanAttachedEvent {
-	content, err := h.transformHTTPBody(tp, header, body, buffer, maxSize)
+	tp string, message *reader.MessageOpt, defaultBodyEncoding string, maxSize int32) []*v3.SpanAttachedEvent {
+	content, err := message.TransformReadableContent(defaultBodyEncoding, int(maxSize))
 	if err != nil {
 		log.Warnf("transform http %s erorr: %v", tp, err)
 		return events
 	}
 
 	event := &v3.SpanAttachedEvent{}
-	event.StartTime = host.TimeToInstant(buffer.StartTime())
-	event.EndTime = host.TimeToInstant(buffer.EndTime())
+	event.StartTime = host.TimeToInstant(message.StartTime())
+	event.EndTime = host.TimeToInstant(message.EndTime())
 	event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
 	event.Tags = make([]*commonv3.KeyStringValuePair, 0)
 	event.Tags = append(event.Tags,
 		// content data
-		&commonv3.KeyStringValuePair{Key: "data_size", Value: units.BytesSize(float64(buffer.TotalSize()))},
+		&commonv3.KeyStringValuePair{Key: "data_size", Value: units.BytesSize(float64(message.ContentTotalSize()))},
 		&commonv3.KeyStringValuePair{Key: "data_content", Value: content},
-		&commonv3.KeyStringValuePair{Key: "data_direction", Value: buffer.Direction().String()},
+		&commonv3.KeyStringValuePair{Key: "data_direction", Value: message.Direction().String()},
 		&commonv3.KeyStringValuePair{Key: "data_type", Value: strings.ToLower(tp)},
 		// connection
 		&commonv3.KeyStringValuePair{Key: "connection_role", Value: traffic.Role.String()},
@@ -286,112 +278,6 @@ func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.Proc
 	return append(events, event)
 }
 
-// nolint
-func (h *Trace) transformHTTPBody(tp string, header http.Header, _ io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) (string, error) {
-	var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == "gzip", true, true
-	contentType := header.Get("Content-Type")
-	if contentType == "" {
-		if tp == transportRequest {
-			contentType = h.TaskConfig.DefaultRequestEncoding
-		} else {
-			contentType = h.TaskConfig.DefaultResponseEncoding
-		}
-		contentType = fmt.Sprintf("text/html; charset=%s", contentType)
-	}
-
-	isPlain = strings.HasPrefix(contentType, "text/") || contentType == "application/json"
-	if _, params, err := mime.ParseMediaType(contentType); err == nil {
-		if cs, ok := params["charset"]; ok {
-			isUtf8 = strings.ToLower(cs) == "utf-8"
-		}
-	}
-
-	if !needGzip && isPlain && isUtf8 {
-		resultSize := len(buffer.BufferData())
-		if maxSize > 0 && resultSize > int(maxSize) {
-			resultSize = int(maxSize)
-		}
-		return string(buffer.BufferData()[0:resultSize]), nil
-	}
-
-	// re-read the buffer and skip to the body position
-	buf := bufio.NewReaderSize(bytes.NewBuffer(buffer.BufferData()), len(buffer.BufferData()))
-	var httpBody io.ReadCloser
-	if tp == transportRequest {
-		req, err := http.ReadRequest(buf)
-		if err != nil {
-			return "", err
-		}
-		httpBody = req.Body
-	} else {
-		response, err := http.ReadResponse(buf, nil)
-		if err != nil {
-			return "", err
-		}
-		httpBody = response.Body
-	}
-	defer httpBody.Close()
-
-	// no text plain, no need to print the data
-	headerLen := len(buffer.BufferData()) - buf.Buffered()
-	if maxSize > 0 && int(maxSize) < headerLen {
-		return string(buffer.BufferData()[:maxSize]), nil
-	}
-	headerString := string(buffer.BufferData()[:headerLen])
-	if !isPlain {
-		return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
-	}
-	// nobody
-	if buf.Buffered() == 0 {
-		return headerString, nil
-	}
-
-	data := httpBody
-	var err error
-	if needGzip {
-		data, err = gzip.NewReader(httpBody)
-		if err != nil {
-			return "", err
-		}
-	}
-	if !isUtf8 {
-		data, err = newCharsetReader(data, contentType)
-		if err != nil {
-			return "", err
-		}
-	}
-
-	realData, err := io.ReadAll(data)
-	if err != nil && err != io.ErrUnexpectedEOF {
-		return "", err
-	}
-	resultSize := len(realData)
-	if maxSize > 0 && (resultSize+headerLen) > int(maxSize) {
-		resultSize = int(maxSize) - headerLen
-	}
-	return fmt.Sprintf("%s%s", headerString, string(realData[0:resultSize])), nil
-}
-
-type charsetReadWrapper struct {
-	reader io.Reader
-}
-
-func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, error) {
-	reader, err := charset.NewReader(r, contentType)
-	if err != nil {
-		return nil, err
-	}
-	return &charsetReadWrapper{reader: reader}, nil
-}
-
-func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
-	return c.reader.Read(p)
-}
-
-func (c *charsetReadWrapper) Close() error {
-	return nil
-}
-
 type SamplingTraceLogBody struct {
 	URI           string                   `json:"uri"`
 	Reason        string                   `json:"reason"`
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
new file mode 100644
index 0000000..90863e8
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+	"bufio"
+	"compress/gzip"
+	"fmt"
+	"io"
+	"mime"
+	"net/http"
+	"strconv"
+	"strings"
+
+	"github.com/apache/skywalking-rover/pkg/logger"
+
+	"golang.org/x/net/html/charset"
+
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+var (
+	headBuffer = make([]byte, 16)
+	bodyBuffer = make([]byte, 4096)
+
+	requestMethods = []string{
+		"GET", "POST", "OPTION", "HEAD", "PUT", "DELETE", "CONNECT", "TRACE", "PATCH",
+	}
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1", "reader")
+
+type MessageType int
+
+const (
+	MessageTypeRequest MessageType = iota
+	MessageTypeResponse
+	MessageTypeUnknown
+)
+
+func IdentityMessageType(reader *protocol.Buffer) (MessageType, error) {
+	n, err := reader.Peek(headBuffer)
+	if err != nil {
+		return MessageTypeUnknown, err
+	} else if n != len(headBuffer) {
+		return MessageTypeUnknown, fmt.Errorf("need more content for header")
+	}
+
+	headerString := string(headBuffer)
+	isRequest := false
+	for _, method := range requestMethods {
+		if strings.HasPrefix(headerString, method) {
+			isRequest = true
+			break
+		}
+	}
+	if isRequest {
+		return MessageTypeRequest, nil
+	}
+
+	if strings.HasPrefix(headerString, "HTTP") {
+		return MessageTypeResponse, nil
+	}
+	return MessageTypeUnknown, nil
+}
+
+type Message interface {
+	Headers() http.Header
+	HeaderBuffer() *protocol.Buffer
+	BodyBuffer() *protocol.Buffer
+}
+
+type MessageOpt struct {
+	Message
+}
+
+func (m *MessageOpt) ContentTotalSize() int {
+	return m.HeaderBuffer().Len() + m.BodyBuffer().Len()
+}
+
+func (m *MessageOpt) StartTime() uint64 {
+	return m.HeaderBuffer().FirstSocketBuffer().StartTime()
+}
+
+func (m *MessageOpt) EndTime() uint64 {
+	return m.HeaderBuffer().LastSocketBuffer().EndTime()
+}
+
+func (m *MessageOpt) Direction() base.SocketDataDirection {
+	return m.HeaderBuffer().FirstSocketBuffer().Direction()
+}
+
+// nolint
+func (m *MessageOpt) TransformReadableContent(defaultEncoding string, maxSize int) (string, error) {
+	contentType := m.Headers().Get("Content-Type")
+	if contentType == "" {
+		contentType = fmt.Sprintf("text/html; charset=%s", defaultEncoding)
+	}
+	isPlain := strings.HasPrefix(contentType, "text/") || contentType == "application/json"
+
+	// header to string
+	headerBuf, err := io.ReadAll(m.HeaderBuffer())
+	if err != nil {
+		return "", err
+	}
+	if maxSize > 0 && len(headerBuf) >= maxSize {
+		return string(headerBuf[:maxSize]), nil
+	}
+	headerString := string(headerBuf)
+	if !isPlain {
+		return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
+	}
+
+	// body to string
+	bodyLength := m.BodyBuffer().Len()
+	if bodyLength == 0 {
+		return headerString, nil
+	}
+	bodyReader, err := m.buildBodyReader(defaultEncoding)
+	if err != nil {
+		return "", err
+	}
+
+	bodyData, err := io.ReadAll(bodyReader)
+	if err != nil && err != io.ErrUnexpectedEOF {
+		return "", err
+	}
+	resultSize := len(bodyData)
+	if maxSize > 0 && (resultSize+len(headerString)) > maxSize {
+		resultSize = maxSize - len(headerString)
+	}
+	return fmt.Sprintf("%s%s", headerString, string(bodyData[0:resultSize])), nil
+}
+
+func (m *MessageOpt) buildBodyReader(contentType string) (io.Reader, error) {
+	var needGzip = m.Headers().Get("Content-Encoding") == "gzip"
+	var isUtf8 = true
+	if _, params, err := mime.ParseMediaType(contentType); err == nil {
+		if cs, ok := params["charset"]; ok {
+			isUtf8 = strings.EqualFold(cs, "utf-8")
+		}
+	}
+
+	var data io.Reader = m.BodyBuffer()
+	var err error
+	if needGzip {
+		data, err = gzip.NewReader(m.BodyBuffer())
+		if err != nil {
+			return nil, err
+		}
+	}
+	if !isUtf8 {
+		data, err = newCharsetReader(data, contentType)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return data, nil
+}
+
+func (m *MessageOpt) appointedLength() (int, error) {
+	contentLengthStr := m.Headers().Get("Content-Length")
+	if contentLengthStr == "" {
+		return -1, nil
+	}
+	contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
+	if err != nil {
+		return 0, fmt.Errorf("the request has not correct content length header value: %s", contentLengthStr)
+	}
+	return int(contentLength), nil
+}
+
+func (m *MessageOpt) isChunked() bool {
+	return m.Headers().Get("Transfer-Encoding") == "chunked"
+}
+
+func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *protocol.Buffer, reader *bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+	startPosition := buf.OffsetPosition(-reader.Buffered())
+	for !buf.IsCurrentPacketReadFinished() {
+		_, err := buf.Read(bodyBuffer)
+		if err != nil {
+			return nil, protocol.ParseResultSkipPackage, err
+		}
+	}
+	endPosition := buf.Position()
+	return buf.Slice(true, startPosition, endPosition), protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkChunkedBody(buf *protocol.Buffer, bodyReader *bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+	buffers := make([]*protocol.Buffer, 0)
+	for {
+		line, _, err := bodyReader.ReadLine()
+		if err != nil {
+			return nil, protocol.ParseResultSkipPackage, err
+		}
+		needBytesStr := string(line)
+		needBytes, err := strconv.ParseInt(needBytesStr, 16, 64)
+		if err != nil {
+			return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read chunked size error: %s", needBytesStr)
+		}
+		if needBytes == 0 {
+			break
+		}
+		if b, r, err1 := m.checkBodyWithSize(buf, bodyReader, int(needBytes), false); err1 != nil {
+			return nil, protocol.ParseResultSkipPackage, err1
+		} else if r != protocol.ParseResultSuccess {
+			return nil, r, nil
+		} else {
+			if pos := b.DetectNotSendingLastPosition(); pos != nil {
+				log.Debugf("found the socket data not sending finished in BPF, so update the body to the latest data, %v", pos)
+				successSlice := b.Slice(true, b.Position(), pos)
+				buffers = append(buffers, successSlice)
+				break
+			}
+			buffers = append(buffers, b)
+		}
+		d, _, err := bodyReader.ReadLine()
+		if err != nil {
+			return nil, protocol.ParseResultSkipPackage, err
+		}
+		if len(d) != 0 {
+			return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the chunk data parding error, should be empty: %s", d)
+		}
+	}
+	return protocol.CombineSlices(true, buffers...), protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkBodyWithSize(buf *protocol.Buffer, reader *bufio.Reader, size int,
+	detectedNotSending bool) (*protocol.Buffer, protocol.ParseResult, error) {
+	reduceSize := size
+	var readSize, lastReadSize int
+	var err error
+	startPosition := buf.OffsetPosition(-reader.Buffered())
+	for reduceSize > 0 {
+		readSize = reduceSize
+		if readSize > len(bodyBuffer) {
+			readSize = len(bodyBuffer)
+		}
+		lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+		if err != nil {
+			if err == protocol.ErrNotComplete {
+				return nil, protocol.ParseResultSkipPackage, nil
+			}
+			if err == io.EOF && reduceSize-lastReadSize <= 0 {
+				return nil, protocol.ParseResultSuccess, nil
+			}
+			return nil, protocol.ParseResultSkipPackage, err
+		}
+		reduceSize -= lastReadSize
+	}
+	endPosition := buf.OffsetPosition(-reader.Buffered())
+	slice := buf.Slice(true, startPosition, endPosition)
+	if detectedNotSending {
+		if pos := slice.DetectNotSendingLastPosition(); pos != nil {
+			log.Debugf("found the socket data not sending finished in BPF, so update the body to the latest data, %v", pos)
+			endPosition = pos
+			slice = buf.Slice(true, startPosition, endPosition)
+		}
+	}
+
+	return slice, protocol.ParseResultSuccess, nil
+}
+
+type charsetReadWrapper struct {
+	reader io.Reader
+}
+
+func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, error) {
+	reader, err := charset.NewReader(r, contentType)
+	if err != nil {
+		return nil, err
+	}
+	return &charsetReadWrapper{reader: reader}, nil
+}
+
+func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
+	return c.reader.Read(p)
+}
+
+func (c *charsetReadWrapper) Close() error {
+	return nil
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
new file mode 100644
index 0000000..1e2445e
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+	"bufio"
+	"fmt"
+	"net/http"
+	"net/textproto"
+	"net/url"
+	"strings"
+
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Request struct {
+	*MessageOpt
+	original     *http.Request
+	headerBuffer *protocol.Buffer
+	bodyBuffer   *protocol.Buffer
+}
+
+func (r *Request) Headers() http.Header {
+	return r.original.Header
+}
+
+func (r *Request) HeaderBuffer() *protocol.Buffer {
+	return r.headerBuffer
+}
+
+func (r *Request) BodyBuffer() *protocol.Buffer {
+	return r.bodyBuffer
+}
+
+func (r *Request) MinDataID() int {
+	return int(r.headerBuffer.FirstSocketBuffer().DataID())
+}
+
+func (r *Request) RequestURI() string {
+	return r.original.RequestURI
+}
+
+func ReadRequest(buf *protocol.Buffer) (*Request, protocol.ParseResult, error) {
+	bufReader := bufio.NewReader(buf)
+	tp := textproto.NewReader(bufReader)
+	req := &http.Request{}
+	result := &Request{original: req}
+	result.MessageOpt = &MessageOpt{result}
+
+	headerStartPosition := buf.Position()
+	line, err := tp.ReadLine()
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read request first lint failure: %v", err)
+	}
+	method, rest, ok1 := strings.Cut(line, " ")
+	requestURI, proto, ok2 := strings.Cut(rest, " ")
+	if !ok1 || !ok2 {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the first line is not request: %s", line)
+	}
+
+	isRequest := false
+	for _, m := range requestMethods {
+		if method == m {
+			isRequest = true
+			break
+		}
+	}
+	if !isRequest {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("is not request: %s", method)
+	}
+	major, minor, ok := http.ParseHTTPVersion(proto)
+	if !ok {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the protocol version cannot be identity: %s", proto)
+	}
+	justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(requestURI, "/")
+	if justAuthority {
+		requestURI = "http://" + requestURI
+	}
+	uri, err := url.ParseRequestURI(requestURI)
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	}
+	req.Method, req.URL, req.RequestURI = method, uri, requestURI
+	req.Proto, req.ProtoMajor, req.ProtoMinor = proto, major, minor
+
+	// header reader
+	mimeHeader, err := tp.ReadMIMEHeader()
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	}
+	req.Header = http.Header(mimeHeader)
+
+	req.Host = req.URL.Host
+	if req.Host == "" {
+		req.Host = req.Header.Get("Host")
+	}
+
+	result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+	if b, r, err := result.readFullBody(bufReader, buf); err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	} else if r != protocol.ParseResultSuccess {
+		return nil, r, nil
+	} else {
+		result.bodyBuffer = b
+	}
+
+	return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Request) buildHeaderBuffer(start *protocol.BufferPosition, buf *protocol.Buffer, bufReader *bufio.Reader) {
+	endPosition := buf.OffsetPosition(-bufReader.Buffered())
+	r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Request) readFullBody(bodyReader *bufio.Reader, original *protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+	length, err := r.appointedLength()
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	} else if length > 0 {
+		return r.checkBodyWithSize(original, bodyReader, length, true)
+	}
+
+	if r.isChunked() {
+		return r.checkChunkedBody(original, bodyReader)
+	}
+
+	return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
new file mode 100644
index 0000000..e1b04ad
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+	"bufio"
+	"fmt"
+	"net/http"
+	"net/textproto"
+	"strconv"
+	"strings"
+
+	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Response struct {
+	*MessageOpt
+	req          *Request
+	original     *http.Response
+	headerBuffer *protocol.Buffer
+	bodyBuffer   *protocol.Buffer
+}
+
+func (r *Response) Headers() http.Header {
+	return r.original.Header
+}
+
+func (r *Response) HeaderBuffer() *protocol.Buffer {
+	return r.headerBuffer
+}
+
+func (r *Response) BodyBuffer() *protocol.Buffer {
+	return r.bodyBuffer
+}
+
+func (r *Response) StatusCode() int {
+	return r.original.StatusCode
+}
+
+func ReadResponse(req *Request, buf *protocol.Buffer) (*Response, protocol.ParseResult, error) {
+	bufReader := bufio.NewReader(buf)
+	tp := textproto.NewReader(bufReader)
+	resp := &http.Response{}
+	result := &Response{original: resp, req: req}
+	result.MessageOpt = &MessageOpt{result}
+
+	headerStartPosition := buf.Position()
+	line, err := tp.ReadLine()
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read response first line failure: %v", err)
+	}
+	indexByte := strings.IndexByte(line, ' ')
+	if indexByte == -1 {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing response error: %s", line)
+	}
+	resp.Proto = line[:indexByte]
+	resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
+	statusCode := resp.Status
+	if i := strings.IndexByte(resp.Status, ' '); i != -1 {
+		statusCode = resp.Status[:i]
+	}
+	if len(statusCode) != 3 {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing response status code failure: %v", statusCode)
+	}
+	resp.StatusCode, err = strconv.Atoi(statusCode)
+	if err != nil || resp.StatusCode < 0 {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("status code not correct: %s", statusCode)
+	}
+	var ok bool
+	if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
+		return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+	}
+
+	mimeHeader, err := tp.ReadMIMEHeader()
+	if err != nil {
+		result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+		return result, protocol.ParseResultSuccess, nil
+	}
+	resp.Header = http.Header(mimeHeader)
+
+	result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+	if b, r, err := result.readFullResponseBody(bufReader, buf); err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	} else if r != protocol.ParseResultSuccess {
+		return nil, r, nil
+	} else {
+		result.bodyBuffer = b
+	}
+	return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Response) buildHeaderBuffer(start *protocol.BufferPosition, buf *protocol.Buffer, bufReader *bufio.Reader) {
+	endPosition := buf.OffsetPosition(-bufReader.Buffered())
+	r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Response) readFullResponseBody(bodyReader *bufio.Reader, original *protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+	length, err := r.appointedLength()
+	if err != nil {
+		return nil, protocol.ParseResultSkipPackage, err
+	} else if length > 0 {
+		return r.checkBodyWithSize(original, bodyReader, length, true)
+	}
+
+	if r.isChunked() {
+		return r.checkChunkedBody(original, bodyReader)
+	}
+
+	return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
index 75f602a..6a98c12 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
@@ -19,7 +19,6 @@ package http1
 
 import (
 	"fmt"
-	"net/http"
 	"regexp"
 	"strings"
 	"time"
@@ -30,6 +29,7 @@ import (
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
 )
 
@@ -52,13 +52,12 @@ func NewSampler() *Sampler {
 	}
 }
 
-func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
-	request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
+func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration, request *reader.Request, response *reader.Response) {
 	if config == nil {
 		return
 	}
 	tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
-		return request.Header.Get(key)
+		return request.Headers().Get(key)
 	})
 	if err != nil {
 		log.Warnf("analyze tracing context error: %v", err)
@@ -68,7 +67,7 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
 		return
 	}
 
-	uri := request.RequestURI
+	uri := request.RequestURI()
 	// remove the query parameters
 	if i := strings.Index(uri, "?"); i > 0 {
 		uri = uri[0:i]
@@ -82,10 +81,10 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
 
 	var traceType string
 	var topN *metrics.TopN
-	if rule.When5XX && response.StatusCode >= 500 && response.StatusCode < 600 {
+	if rule.When5XX && response.StatusCode() >= 500 && response.StatusCode() < 600 {
 		traceType = "status_5xx"
 		topN = s.Error5xxTraces
-	} else if rule.When4XX && response.StatusCode >= 400 && response.StatusCode < 500 {
+	} else if rule.When4XX && response.StatusCode() >= 400 && response.StatusCode() < 500 {
 		traceType = "status_4xx"
 		topN = s.Error4xxTraces
 	} else if rule.MinDuration != nil && int64(*rule.MinDuration) <= duration.Milliseconds() {
@@ -96,15 +95,13 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
 	}
 
 	trace := &Trace{
-		Trace:          tracingContext,
-		RequestURI:     uri,
-		RequestBuffer:  reqBuffer,
-		ResponseBuffer: respBuffer,
-		Request:        request,
-		Response:       response,
-		Type:           traceType,
-		Settings:       rule.Settings,
-		TaskConfig:     config.ProfilingSampling,
+		Trace:      tracingContext,
+		RequestURI: uri,
+		Request:    request,
+		Response:   response,
+		Type:       traceType,
+		Settings:   rule.Settings,
+		TaskConfig: config.ProfilingSampling,
 	}
 	topN.AddRecord(trace, duration.Milliseconds())
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 4028039..d3db212 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -23,6 +23,8 @@ import (
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
+
+	"golang.org/x/net/context"
 )
 
 var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols")
@@ -43,15 +45,16 @@ func init() {
 
 type Analyzer struct {
 	ctx       protocol.Context
-	protocols []protocol.Protocol
+	protocols map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer
 }
 
 func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer {
-	protocols := make([]protocol.Protocol, 0)
+	protocols := make(map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer)
 	for _, r := range registerProtocols {
 		p := r()
 		p.Init(config)
-		protocols = append(protocols, p)
+		analyzer := protocol.NewProtocolAnalyzer(ctx, p, config)
+		protocols[p.Protocol()] = analyzer
 	}
 	return &Analyzer{
 		ctx:       ctx,
@@ -59,14 +62,20 @@ func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer {
 	}
 }
 
-func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
+func (a *Analyzer) Start(ctx context.Context) {
 	for _, p := range a.protocols {
-		if p.ReceiveData(a.ctx, event) {
-			return
-		}
+		p.Start(ctx)
+	}
+}
+
+func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
+	analyzer := a.protocols[event.Protocol]
+	if analyzer == nil {
+		log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)",
+			event.GenerateConnectionID(), event.Protocol.String(), event.Protocol)
+		return
 	}
-	log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d), type: %s",
-		event.GenerateConnectionID(), event.Protocol.String(), event.Protocol, event.MsgType.String())
+	analyzer.ReceiveSocketData(a.ctx, event)
 }
 
 func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
@@ -76,19 +85,19 @@ func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
 }
 
 type ProtocolMetrics struct {
-	data map[string]protocol.Metrics
+	data map[base.ConnectionProtocol]protocol.Metrics
 }
 
 func NewProtocolMetrics() *ProtocolMetrics {
-	metrics := make(map[string]protocol.Metrics)
+	metrics := make(map[base.ConnectionProtocol]protocol.Metrics)
 	for _, p := range defaultInstances {
-		metrics[p.Name()] = p.GenerateMetrics()
+		metrics[p.Protocol()] = p.GenerateMetrics()
 	}
 	return &ProtocolMetrics{data: metrics}
 }
 
-func (m *ProtocolMetrics) GetProtocolMetrics(name string) protocol.Metrics {
-	return m.data[name]
+func (m *ProtocolMetrics) GetProtocolMetrics(p base.ConnectionProtocol) protocol.Metrics {
+	return m.data[p]
 }
 
 func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
diff --git a/pkg/profiling/task/network/analyze/layer7/queue.go b/pkg/profiling/task/network/analyze/layer7/queue.go
index 352059d..4c6ffa9 100644
--- a/pkg/profiling/task/network/analyze/layer7/queue.go
+++ b/pkg/profiling/task/network/analyze/layer7/queue.go
@@ -28,6 +28,7 @@ import (
 )
 
 type PartitionContext interface {
+	Start(ctx context.Context)
 	Consume(data interface{})
 }
 
@@ -72,8 +73,9 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader, emap *eb
 	}
 
 	for i := 0; i < len(e.partitions); i++ {
-		go func(inx int) {
+		go func(ctx context.Context, inx int) {
 			p := e.partitions[inx]
+			p.ctx.Start(ctx)
 			for {
 				select {
 				// consume the data
@@ -84,7 +86,7 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader, emap *eb
 					return
 				}
 			}
-		}(i)
+		}(ctx, i)
 	}
 }
 
diff --git a/pkg/tools/host/time.go b/pkg/tools/host/time.go
index 7cc8a78..15e22bd 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/tools/host/time.go
@@ -41,10 +41,14 @@ func init() {
 }
 
 func TimeToInstant(bpfTime uint64) *v3.Instant {
-	timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
-	result := timeCopy.Add(time.Duration(bpfTime))
+	result := Time(bpfTime)
 	return &v3.Instant{
 		Seconds: result.Unix(),
 		Nanos:   int32(result.Nanosecond()),
 	}
 }
+
+func Time(bpfTime uint64) time.Time {
+	timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
+	return timeCopy.Add(time.Duration(bpfTime))
+}