You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2022/10/13 08:25:44 UTC

[skywalking-rover] branch main updated: Refactor the network profiling analysis (#55)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new f1c8d49  Refactor the network profiling analysis (#55)
f1c8d49 is described below

commit f1c8d493486dff54043fe7a6cd4353593f2ba6bb
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Oct 13 16:25:39 2022 +0800

    Refactor the network profiling analysis (#55)
---
 pkg/profiling/task/network/analyze/analyze.go      |  33 +
 .../task/network/analyze/base/connection.go        |  93 +++
 pkg/profiling/task/network/analyze/base/context.go | 402 ++++++++++++
 .../task/network/{ => analyze/base}/enums.go       |   2 +-
 pkg/profiling/task/network/analyze/base/events.go  |  70 ++
 .../task/network/analyze/base/listener.go          |  51 ++
 pkg/profiling/task/network/analyze/base/metrics.go |  98 +++
 .../task/network/{ => analyze/base}/tcpresolver.go |   2 +-
 .../{analyzer.go => analyze/base/traffic.go}       | 106 +--
 .../task/network/analyze/layer4/events.go          |  88 +++
 .../task/network/analyze/layer4/listener.go        | 402 ++++++++++++
 .../task/network/analyze/layer4/metrics.go         | 276 ++++++++
 pkg/profiling/task/network/bpf/bpf.go              |  56 ++
 pkg/profiling/task/network/{ => bpf}/linker.go     |   5 +-
 pkg/profiling/task/network/context.go              | 726 ---------------------
 pkg/profiling/task/network/metrics.go              | 396 -----------
 pkg/profiling/task/network/runner.go               | 223 ++++---
 pkg/profiling/task/network/ssl.go                  |  86 +--
 18 files changed, 1799 insertions(+), 1316 deletions(-)

diff --git a/pkg/profiling/task/network/analyze/analyze.go b/pkg/profiling/task/network/analyze/analyze.go
new file mode 100644
index 0000000..28cda1e
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/analyze.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package analyze
+
+import (
+	"github.com/apache/skywalking-rover/pkg/process/api"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer4"
+)
+
+// NewContext Wrap the analyzer builder
+func NewContext(monitorProcesses map[int32][]api.ProcessInterface) *base.AnalyzerContext {
+	context := base.NewAnalyzerContext(monitorProcesses)
+	// register all listeners
+	context.AddListener(layer4.NewListener())
+
+	return context
+}
diff --git a/pkg/profiling/task/network/analyze/base/connection.go b/pkg/profiling/task/network/analyze/base/connection.go
new file mode 100644
index 0000000..54f3940
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -0,0 +1,93 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import "github.com/apache/skywalking-rover/pkg/process/api"
+
+type ConnectionContext struct {
+	// basic metadata
+	ConnectionID     uint64
+	RandomID         uint64
+	LocalPid         uint32
+	SocketFD         uint32
+	LocalProcesses   []api.ProcessInterface
+	ConnectionClosed bool
+	Protocol         ConnectionProtocol
+	IsSSL            bool
+
+	// socket metadata
+	Role       ConnectionRole
+	LocalIP    string
+	LocalPort  uint16
+	RemoteIP   string
+	RemotePort uint16
+
+	// metrics
+	Metrics *ConnectionMetrics
+
+	// Flush the data content to the oap count
+	FlushDataCount int
+}
+
+func (c *AnalyzerContext) NewConnectionContext(conID, randomID uint64, pid, fd uint32, processes []api.ProcessInterface,
+	conClosed bool) *ConnectionContext {
+	connection := &ConnectionContext{
+		// metadata
+		ConnectionID:     conID,
+		RandomID:         randomID,
+		LocalPid:         pid,
+		SocketFD:         fd,
+		LocalProcesses:   processes,
+		ConnectionClosed: conClosed,
+
+		Metrics: c.NewConnectionMetrics(),
+	}
+	return connection
+}
+
+type ActiveConnectionInBPF struct {
+	RandomID     uint64
+	Pid          uint32
+	SocketFD     uint32
+	Role         ConnectionRole
+	SocketFamily uint32
+
+	RemoteAddrV4   uint32
+	RemoteAddrV6   [16]uint8
+	RemoteAddrPort uint32
+	LocalAddrV4    uint32
+	LocalAddrV6    [16]uint8
+	LocalAddrPort  uint32
+
+	WriteBytes   uint64
+	WriteCount   uint64
+	WriteExeTime uint64
+	ReadBytes    uint64
+	ReadCount    uint64
+	ReadExeTime  uint64
+
+	WriteRTTCount   uint64
+	WriteRTTExeTime uint64
+
+	// Protocol analyze context
+	Protocol ConnectionProtocol
+	IsSSL    uint32
+
+	// the connect event is already sent
+	ConnectEventIsSent uint32
+}
diff --git a/pkg/profiling/task/network/analyze/base/context.go b/pkg/profiling/task/network/analyze/base/context.go
new file mode 100644
index 0000000..fcda589
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/context.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net"
+	"sync"
+	"unsafe"
+
+	"github.com/apache/skywalking-rover/pkg/process/api"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+
+	"github.com/cilium/ebpf"
+
+	"github.com/sirupsen/logrus"
+
+	cmap "github.com/orcaman/concurrent-map"
+
+	"golang.org/x/sys/unix"
+)
+
+type AnalyzerContext struct {
+	// listening process map
+	processes map[int32][]api.ProcessInterface
+
+	// connection handler
+	activeConnections cmap.ConcurrentMap      // current activeConnections connections
+	closedConnections []*ConnectionContext    // closed connections'
+	flushClosedEvents chan *SocketCloseEvent  // connection have been closed, it is a queue to cache unknown active connections
+	sockParseQueue    chan *ConnectionContext // socket address parse queue
+
+	// analyze listener list
+	listeners []AnalyzeListener
+
+	// close connection modify locker
+	closedConnectionLocker sync.RWMutex
+}
+
+func NewAnalyzerContext(processes map[int32][]api.ProcessInterface) *AnalyzerContext {
+	return &AnalyzerContext{
+		processes:         processes,
+		activeConnections: cmap.New(),
+		closedConnections: make([]*ConnectionContext, 0),
+		flushClosedEvents: make(chan *SocketCloseEvent, 5000),
+		sockParseQueue:    make(chan *ConnectionContext, 5000),
+		listeners:         make([]AnalyzeListener, 0),
+	}
+}
+
+func (c *AnalyzerContext) AddListener(l AnalyzeListener) {
+	c.listeners = append(c.listeners, l)
+}
+
+func (c *AnalyzerContext) GetAllConnectionWithContext() []*ConnectionContext {
+	result := make([]*ConnectionContext, 0)
+	result = append(result, c.flushClosedConnection()...)
+	for _, con := range c.activeConnections.Items() {
+		result = append(result, con.(*ConnectionContext))
+	}
+	return result
+}
+
+func (c *AnalyzerContext) RegisterAllHandlers(bpfLoader *bpf.Loader) {
+	// socket connect
+	bpfLoader.ReadEventAsync(bpfLoader.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
+		return &SocketConnectEvent{}
+	})
+	// socket close
+	bpfLoader.ReadEventAsync(bpfLoader.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
+		return &SocketCloseEvent{}
+	})
+	for _, l := range c.listeners {
+		l.RegisterBPFEvents(bpfLoader)
+	}
+}
+
+func (c *AnalyzerContext) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+	for _, l := range c.listeners {
+		l.RegisterBPFEvents(bpfLoader)
+	}
+}
+
+func (c *AnalyzerContext) StartSocketAddressParser(ctx context.Context) {
+	for i := 0; i < 2; i++ {
+		go c.handleSocketParseQueue(ctx)
+	}
+}
+
+func (c *AnalyzerContext) handleSocketParseQueue(ctx context.Context) {
+	for {
+		select {
+		case cc := <-c.sockParseQueue:
+			socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
+			if err != nil {
+				// if the remote port of connection is empty, then this connection not available basically
+				if cc.RemotePort == 0 {
+					log.Warnf("complete the socket error, pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
+				}
+				continue
+			}
+			cc.LocalIP = socket.SrcIP
+			cc.LocalPort = socket.SrcPort
+			cc.RemoteIP = socket.DestIP
+			cc.RemotePort = socket.DestPort
+		case <-ctx.Done():
+			return
+		}
+	}
+}
+
+func (c *AnalyzerContext) handleSocketConnectEvent(data interface{}) {
+	event := data.(*SocketConnectEvent)
+
+	if log.Enable(logrus.DebugLevel) {
+		marshal, _ := json.Marshal(event)
+		log.Debugf("found connect event, json: %s", string(marshal))
+	}
+
+	processes := c.processes[int32(event.Pid)]
+	if len(processes) == 0 {
+		log.Warnf("get process connect event, but this process is don't need to monitor, pid: %d", event.Pid)
+		return
+	}
+
+	// build active connection information
+	con := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.FD, processes, false)
+	con.Role = event.Role
+	if event.NeedComplete == 0 {
+		con.RemotePort = uint16(event.RemoteAddrPort)
+		con.LocalPort = uint16(event.LocalAddrPort)
+		if event.SocketFamily == unix.AF_INET {
+			con.LocalIP = parseAddressV4(event.LocalAddrV4)
+			con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+		} else {
+			con.LocalIP = parseAddressV6(event.LocalAddrV6)
+			con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+		}
+	} else {
+		// if the remote address exists then setting it
+		if event.RemoteAddrPort != 0 {
+			con.RemotePort = uint16(event.RemoteAddrPort)
+			if event.SocketFamily == unix.AF_INET {
+				con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+			} else {
+				con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+			}
+		}
+		c.sockParseQueue <- con
+	}
+
+	// notify the listeners
+	for _, l := range c.listeners {
+		l.ReceiveNewConnection(con, event)
+	}
+
+	// add to the context
+	c.saveActiveConnection(con)
+}
+
+func (c *AnalyzerContext) handleSocketCloseEvent(data interface{}) {
+	event := data.(*SocketCloseEvent)
+
+	if log.Enable(logrus.DebugLevel) {
+		marshal, _ := json.Marshal(event)
+		log.Debugf("found close event: %s", string(marshal))
+	}
+
+	// try to handle the socket close event
+	if !c.socketClosedEvent0(event) {
+		// is not in active connection, maybe it's not have been added to activate first
+		// just add to the close queue, wait for the flush connection with interval
+		c.flushClosedEvents <- event
+		return
+	}
+}
+
+func (c *AnalyzerContext) FlushAllMetrics(bpfLoader *bpf.Loader, metricsPrefix string) (*MetricsBuilder, error) {
+	metricsBuilder := NewMetricsBuilder(metricsPrefix)
+	err := c.flushMetrics0(bpfLoader, metricsBuilder)
+	if err != nil {
+		return nil, err
+	}
+	return metricsBuilder, nil
+}
+
+func (c *AnalyzerContext) flushMetrics0(bpfLoader *bpf.Loader, builder *MetricsBuilder) error {
+	// handling the unfinished close event
+	c.processCachedCloseEvents()
+
+	// get all connections
+	ccs := c.GetAllConnectionWithContext()
+	if len(ccs) == 0 {
+		return nil
+	}
+
+	// prepare to flush metrics
+	err := c.prepareToFlushMetrics(ccs, bpfLoader)
+	if err != nil {
+		return fmt.Errorf("prepare to flush the connection metrics failure: %v", err)
+	}
+
+	// combine all connections
+	analyzer := c.NewTrafficAnalyzer()
+	traffics := analyzer.CombineConnectionToTraffics(ccs)
+
+	// generate connections
+	for _, l := range c.listeners {
+		l.FlushMetrics(traffics, builder)
+	}
+
+	// after flush metrics
+	for _, l := range c.listeners {
+		l.PostFlushConnectionMetrics(ccs)
+	}
+	return nil
+}
+
+func (c *AnalyzerContext) prepareToFlushMetrics(ccs []*ConnectionContext, bpfLoader *bpf.Loader) error {
+	var active *ActiveConnectionInBPF
+	closedConnections := make([]string, 0)
+	connectionWithBPFList := make([]*ConnectionWithBPF, 0)
+
+	for _, cc := range ccs {
+		active, closedConnections = c.lookupTheActiveConnectionInBPf(cc, bpfLoader, closedConnections)
+		connectionWithBPFList = append(connectionWithBPFList, &ConnectionWithBPF{
+			Connection:  cc,
+			ActiveInBPF: active,
+		})
+	}
+
+	// delete closed connections
+	if len(closedConnections) > 0 {
+		c.deleteConnectionOnly(closedConnections)
+	}
+
+	// call the listeners
+	for _, l := range c.listeners {
+		err := l.PreFlushConnectionMetrics(connectionWithBPFList, bpfLoader)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (c *AnalyzerContext) lookupTheActiveConnectionInBPf(connection *ConnectionContext, bpfLoader *bpf.Loader,
+	closedConnections []string) (active *ActiveConnectionInBPF, closedRef []string) {
+	var activeConnection ActiveConnectionInBPF
+	// if connection not closed, then load the basic stats from bpf map
+	if !connection.ConnectionClosed {
+		err := bpfLoader.ActiveConnectionMap.Lookup(connection.ConnectionID, &activeConnection)
+		if err != nil {
+			if errors.Is(err, ebpf.ErrKeyNotExist) {
+				closedConnections = append(closedConnections, c.generateConnectionKey(connection.ConnectionID, connection.RandomID))
+				connection.ConnectionClosed = true
+			} else {
+				log.Warnf("lookup the active connection error, connection id: %d, error: %v", connection.ConnectionID, err)
+			}
+			return nil, closedConnections
+		}
+
+		if log.Enable(logrus.DebugLevel) {
+			marshal, _ := json.Marshal(activeConnection)
+			log.Debugf("found the active connection, conid: %d, data: %s", connection.ConnectionID, string(marshal))
+		}
+
+		if connection.Role == ConnectionRoleUnknown && activeConnection.Role != ConnectionRoleUnknown {
+			connection.Role = activeConnection.Role
+		}
+		if connection.Protocol == ConnectionProtocolUnknown && activeConnection.Protocol != ConnectionProtocolUnknown {
+			connection.Protocol = activeConnection.Protocol
+		}
+		if !connection.IsSSL && activeConnection.IsSSL == 1 {
+			connection.IsSSL = true
+		}
+		return &activeConnection, closedConnections
+	}
+	return nil, closedConnections
+}
+
+func (c *AnalyzerContext) deleteConnectionOnly(ccs []string) {
+	for _, cc := range ccs {
+		c.activeConnections.Remove(cc)
+	}
+}
+
+func (c *AnalyzerContext) processCachedCloseEvents() {
+	for len(c.flushClosedEvents) > 0 {
+		event := <-c.flushClosedEvents
+		if !c.socketClosedEvent0(event) {
+			// if cannot the found the active connection, then just create a new closed connection context
+			processes := c.processes[int32(event.Pid)]
+			if len(processes) == 0 {
+				continue
+			}
+			cc := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.SocketFD, processes, true)
+			if event.SocketFamily == unix.AF_INET {
+				cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+				cc.LocalIP = parseAddressV4(event.LocalAddrV4)
+			} else if event.SocketFamily == unix.AF_INET6 {
+				cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+				cc.LocalIP = parseAddressV6(event.LocalAddrV6)
+			} else {
+				continue
+			}
+
+			// append to the closed connection
+			c.appendClosedConnection(c.combineClosedConnection(cc, event))
+		}
+	}
+}
+
+func (c *AnalyzerContext) generateConnectionKey(conID, randomID uint64) string {
+	return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (c *AnalyzerContext) socketClosedEvent0(event *SocketCloseEvent) bool {
+	activeCon := c.foundAndDeleteConnection(event)
+	if activeCon == nil {
+		return false
+	}
+
+	// combine the connection data
+	c.appendClosedConnection(c.combineClosedConnection(activeCon, event))
+	return true
+}
+
+func (c *AnalyzerContext) foundAndDeleteConnection(event *SocketCloseEvent) *ConnectionContext {
+	conKey := c.generateConnectionKey(event.ConID, event.RandomID)
+	val, exists := c.activeConnections.Pop(conKey)
+	if !exists {
+		return nil
+	}
+	return val.(*ConnectionContext)
+}
+
+func (c *AnalyzerContext) combineClosedConnection(active *ConnectionContext, closed *SocketCloseEvent) *ConnectionContext {
+	active.ConnectionClosed = true
+
+	if active.Role == ConnectionRoleUnknown && closed.Role != ConnectionRoleUnknown {
+		active.Role = closed.Role
+	}
+	if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != ConnectionProtocolUnknown {
+		active.Protocol = closed.Protocol
+	}
+	if !active.IsSSL && closed.IsSSL == 1 {
+		active.IsSSL = true
+	}
+
+	// notify listeners
+	for _, l := range c.listeners {
+		l.ReceiveCloseConnection(active, closed)
+	}
+	return active
+}
+
+func (c *AnalyzerContext) saveActiveConnection(con *ConnectionContext) {
+	c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, con.RandomID), con)
+}
+
+func (c *AnalyzerContext) flushClosedConnection() []*ConnectionContext {
+	c.closedConnectionLocker.Lock()
+	defer c.closedConnectionLocker.Unlock()
+
+	connections := c.closedConnections
+	c.closedConnections = make([]*ConnectionContext, 0)
+	return connections
+}
+
+func (c *AnalyzerContext) appendClosedConnection(con *ConnectionContext) {
+	c.closedConnectionLocker.RLock()
+	defer c.closedConnectionLocker.RUnlock()
+
+	c.closedConnections = append(c.closedConnections, con)
+}
+
+func parseAddressV4(val uint32) string {
+	return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+	return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/enums.go b/pkg/profiling/task/network/analyze/base/enums.go
similarity index 99%
rename from pkg/profiling/task/network/enums.go
rename to pkg/profiling/task/network/analyze/base/enums.go
index db43f06..d1f37e7 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/analyze/base/enums.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 const (
 	unknown = "unknown"
diff --git a/pkg/profiling/task/network/analyze/base/events.go b/pkg/profiling/task/network/analyze/base/events.go
new file mode 100644
index 0000000..6fbf4b4
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+// SocketConnectEvent Socket have been connection/accept event
+type SocketConnectEvent struct {
+	ConID        uint64
+	RandomID     uint64
+	ExeTime      uint64
+	NeedComplete uint32
+	Pid          uint32
+	FD           uint32
+	FuncName     uint32
+
+	// socket information if exists
+	Role           ConnectionRole
+	SocketFamily   uint32
+	RemoteAddrV4   uint32
+	RemoteAddrV6   [16]uint8
+	RemoteAddrPort uint32
+	LocalAddrV4    uint32
+	LocalAddrV6    [16]uint8
+	LocalAddrPort  uint32
+}
+
+type SocketCloseEvent struct {
+	ConID    uint64
+	RandomID uint64
+	ExeTime  uint64
+	Pid      uint32
+	SocketFD uint32
+	Role     ConnectionRole
+	Protocol ConnectionProtocol
+	IsSSL    uint32
+	Fix      uint32
+
+	SocketFamily   uint32
+	RemoteAddrV4   uint32
+	RemoteAddrV6   [16]uint8
+	RemoteAddrPort uint32
+	LocalAddrV4    uint32
+	LocalAddrV6    [16]uint8
+	LocalAddrPort  uint32
+	Fix1           uint32
+
+	WriteBytes   uint64
+	WriteCount   uint64
+	WriteExeTime uint64
+	ReadBytes    uint64
+	ReadCount    uint64
+	ReadExeTime  uint64
+
+	WriteRTTCount   uint64
+	WriteRTTExeTime uint64
+}
diff --git a/pkg/profiling/task/network/analyze/base/listener.go b/pkg/profiling/task/network/analyze/base/listener.go
new file mode 100644
index 0000000..1e9e7d2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/listener.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+)
+
+type AnalyzeListener interface {
+	// Name of the listener
+	Name() string
+	// GenerateMetrics generate a metrics context
+	// It would bind to a ConnectionContext or ProcessTraffic automatically
+	GenerateMetrics() ListenerMetrics
+
+	// RegisterBPFEvents register the BPF events
+	RegisterBPFEvents(bpfLoader *bpf.Loader)
+
+	// ReceiveNewConnection call this method when receive a new connection event
+	// when return a metrics then It would bind to with the connection
+	ReceiveNewConnection(ctx *ConnectionContext, event *SocketConnectEvent)
+	// ReceiveCloseConnection call this method when receive the connection close event
+	ReceiveCloseConnection(ctx *ConnectionContext, event *SocketCloseEvent)
+
+	// PreFlushConnectionMetrics prepare to flush the connection metrics
+	PreFlushConnectionMetrics(ccs []*ConnectionWithBPF, bpfLoader *bpf.Loader) error
+	// FlushMetrics flush all metrics from connections
+	FlushMetrics(traffics []*ProcessTraffic, builder *MetricsBuilder)
+	// PostFlushConnectionMetrics after flushing all metrics, usually used to refresh the metrics
+	PostFlushConnectionMetrics(ccs []*ConnectionContext)
+}
+
+type ConnectionWithBPF struct {
+	Connection  *ConnectionContext
+	ActiveInBPF *ActiveConnectionInBPF
+}
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go b/pkg/profiling/task/network/analyze/base/metrics.go
new file mode 100644
index 0000000..2c1eddf
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"time"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
+
+// ListenerMetrics The Metrics in each listener
+type ListenerMetrics interface {
+	// FlushMetrics Flush the metrics of connection, and merge into self
+	FlushMetrics(connection *ConnectionContext)
+}
+
+type ConnectionMetrics struct {
+	data map[string]ListenerMetrics
+}
+
+func (c *AnalyzerContext) NewConnectionMetrics() *ConnectionMetrics {
+	data := make(map[string]ListenerMetrics)
+	for _, l := range c.listeners {
+		data[l.Name()] = l.GenerateMetrics()
+	}
+	return &ConnectionMetrics{data: data}
+}
+
+func (c *ConnectionMetrics) GetMetrics(listenerName string) ListenerMetrics {
+	return c.data[listenerName]
+}
+
+func (c *ConnectionMetrics) FlushMetrics(connection *ConnectionContext) {
+	for _, metric := range c.data {
+		metric.FlushMetrics(connection)
+	}
+}
+
+type MetricsBuilder struct {
+	prefix  string
+	metrics map[metadata][]*v3.MeterData
+}
+
+func NewMetricsBuilder(prefix string) *MetricsBuilder {
+	return &MetricsBuilder{
+		prefix:  prefix,
+		metrics: make(map[metadata][]*v3.MeterData),
+	}
+}
+
+func (m *MetricsBuilder) AppendMetrics(service, instance string, metrics []*v3.MeterData) {
+	meta := metadata{ServiceName: service, InstanceName: instance}
+	existingMetrics := m.metrics[meta]
+	if len(existingMetrics) == 0 {
+		m.metrics[meta] = metrics
+		return
+	}
+	m.metrics[meta] = append(existingMetrics, metrics...)
+}
+
+func (m *MetricsBuilder) MetricPrefix() string {
+	return m.prefix
+}
+
+func (m *MetricsBuilder) Build() []*v3.MeterDataCollection {
+	collections := make([]*v3.MeterDataCollection, 0)
+	now := time.Now().UnixMilli()
+	for meta, meters := range m.metrics {
+		if len(meters) == 0 {
+			continue
+		}
+		meters[0].Service = meta.ServiceName
+		meters[0].ServiceInstance = meta.InstanceName
+		meters[0].Timestamp = now
+		collections = append(collections, &v3.MeterDataCollection{MeterData: meters})
+	}
+	return collections
+}
+
+type metadata struct {
+	ServiceName  string
+	InstanceName string
+}
diff --git a/pkg/profiling/task/network/tcpresolver.go b/pkg/profiling/task/network/analyze/base/tcpresolver.go
similarity index 99%
rename from pkg/profiling/task/network/tcpresolver.go
rename to pkg/profiling/task/network/analyze/base/tcpresolver.go
index 92d2326..dc27cd8 100644
--- a/pkg/profiling/task/network/tcpresolver.go
+++ b/pkg/profiling/task/network/analyze/base/tcpresolver.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 import (
 	"encoding/binary"
diff --git a/pkg/profiling/task/network/analyzer.go b/pkg/profiling/task/network/analyze/base/traffic.go
similarity index 86%
rename from pkg/profiling/task/network/analyzer.go
rename to pkg/profiling/task/network/analyze/base/traffic.go
index 8eb69bc..99a88a2 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyze/base/traffic.go
@@ -15,21 +15,50 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package base
 
 import (
+	"fmt"
+
+	"github.com/apache/skywalking-rover/pkg/logger"
 	"github.com/apache/skywalking-rover/pkg/process/api"
 	"github.com/apache/skywalking-rover/pkg/tools"
 )
 
+var log = logger.GetLogger("profiling", "task", "network", "analyze")
+
 const (
 	layerMeshDP  = "MESH_DP"
 	layerMeshApp = "MESH"
 	processEnvoy = "envoy"
 )
 
+type ProcessTraffic struct {
+	Analyzer *TrafficAnalyzer
+
+	// local process information
+	LocalIP        string
+	LocalPort      uint16
+	LocalPid       uint32
+	LocalProcesses []api.ProcessInterface
+
+	// remote process/address information
+	RemoteIP        string
+	RemotePort      uint16
+	RemotePid       uint32
+	RemoteProcesses []api.ProcessInterface
+
+	// connection basic information
+	Role     ConnectionRole
+	Protocol ConnectionProtocol
+	IsSSL    bool
+
+	// metrics
+	Metrics *ConnectionMetrics
+}
+
 type TrafficAnalyzer struct {
-	existingProcesses map[int32][]api.ProcessInterface
+	analyzeContext *AnalyzerContext
 	// used to find local same with remote address
 	// the connect request(local:a -> remote:b) same with accept address(remote:a -> local:b)
 	// key: localIP:port+RemoteIP+port
@@ -59,9 +88,9 @@ type TrafficAnalyzer struct {
 	localAddresses map[string]map[string]api.ProcessInterface
 }
 
-func NewTrafficAnalyzer(processes map[int32][]api.ProcessInterface) *TrafficAnalyzer {
+func (c *AnalyzerContext) NewTrafficAnalyzer() *TrafficAnalyzer {
 	return &TrafficAnalyzer{
-		existingProcesses:             processes,
+		analyzeContext:                c,
 		localWithPeerCache:            make(map[LocalWithPeerAddress]*PidWithRole),
 		peerAddressCache:              make(map[PeerAddress][]uint32),
 		envoyAcceptClientAddressCache: make(map[PeerAddress]*AddressWithPid),
@@ -125,19 +154,33 @@ func (t *TrafficAnalyzer) CombineConnectionToTraffics(connections []*ConnectionC
 	// combine all result
 	result := make([]*ProcessTraffic, 0)
 	for _, v := range pidMatchedTraffic {
-		if v.ContainsAnyTraffic() {
-			result = append(result, v)
-		}
+		result = append(result, v)
 	}
 	for _, v := range pidToRemoteTraffic {
-		if v.ContainsAnyTraffic() {
-			result = append(result, v)
-		}
+		result = append(result, v)
 	}
 
 	return result
 }
 
+func (t *ProcessTraffic) GenerateConnectionInfo() string {
+	localInfo := fmt.Sprintf("%s:%d(%d)", t.LocalIP, t.LocalPort, t.LocalPid)
+	if len(t.LocalProcesses) > 0 {
+		localInfo = t.generateProcessInfo(t.LocalProcesses[0])
+	}
+
+	remoteInfo := fmt.Sprintf("%s:%d(%d)", t.RemoteIP, t.RemotePort, t.RemotePid)
+	if len(t.RemoteProcesses) > 0 {
+		remoteInfo = t.generateProcessInfo(t.RemoteProcesses[0])
+	}
+	return fmt.Sprintf("%s -> %s", localInfo, remoteInfo)
+}
+
+func (t *ProcessTraffic) generateProcessInfo(p api.ProcessInterface) string {
+	return fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, p.Entity().ServiceName,
+		p.Entity().InstanceName, p.Entity().ProcessName, t.LocalIP, t.LocalPort, t.LocalPid)
+}
+
 func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con *ConnectionContext) {
 	if con.Role != ConnectionRoleUnknown {
 		return
@@ -163,34 +206,21 @@ func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con *C
 func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con *ConnectionContext, remotePid uint32) *ProcessTraffic {
 	if traffic == nil {
 		traffic = &ProcessTraffic{
-			analyzer: t,
+			Analyzer: t,
 
 			LocalPid:       con.LocalPid,
 			LocalProcesses: con.LocalProcesses,
 			LocalIP:        con.LocalIP,
 			LocalPort:      con.LocalPort,
 
-			ConnectionRole: con.Role,
-
-			WriteCounter:            NewSocketDataCounter(),
-			ReadCounter:             NewSocketDataCounter(),
-			WriteRTTCounter:         NewSocketDataCounter(),
-			ConnectCounter:          NewSocketDataCounter(),
-			CloseCounter:            NewSocketDataCounter(),
-			RetransmitCounter:       NewSocketDataCounter(),
-			DropCounter:             NewSocketDataCounter(),
-			WriteRTTHistogram:       NewSocketDataHistogram(HistogramDataUnitUS),
-			WriteExeTimeHistogram:   NewSocketDataHistogram(HistogramDataUnitNS),
-			ReadExeTimeHistogram:    NewSocketDataHistogram(HistogramDataUnitNS),
-			ConnectExeTimeHistogram: NewSocketDataHistogram(HistogramDataUnitNS),
-			CloseExeTimeHistogram:   NewSocketDataHistogram(HistogramDataUnitNS),
+			Metrics: t.analyzeContext.NewConnectionMetrics(),
 		}
 	}
 	if len(traffic.LocalProcesses) == 0 && len(con.LocalProcesses) > 0 {
 		traffic.LocalProcesses = con.LocalProcesses
 	}
-	if traffic.ConnectionRole == ConnectionRoleUnknown && con.Role != ConnectionRoleUnknown {
-		traffic.ConnectionRole = con.Role
+	if traffic.Role == ConnectionRoleUnknown && con.Role != ConnectionRoleUnknown {
+		traffic.Role = con.Role
 	}
 	if traffic.Protocol == ConnectionProtocolUnknown && con.Protocol != ConnectionProtocolUnknown {
 		traffic.Protocol = con.Protocol
@@ -206,23 +236,9 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con
 	traffic.RemoteIP = con.RemoteIP
 	traffic.RemotePort = con.RemotePort
 
-	traffic.WriteCounter.Increase(con.WriteCounter.CalculateIncrease())
-	traffic.ReadCounter.Increase(con.ReadCounter.CalculateIncrease())
-	traffic.WriteRTTCounter.Increase(con.WriteRTTCounter.CalculateIncrease())
-	traffic.RetransmitCounter.Increase(con.RetransmitCounter)
-	traffic.DropCounter.Increase(con.DropCounter)
-	traffic.WriteRTTHistogram.Increase(con.WriteRTTHistogram.CalculateIncrease())
-	traffic.WriteExeTimeHistogram.Increase(con.WriteExeTimeHistogram.CalculateIncrease())
-	traffic.ReadExeTimeHistogram.Increase(con.ReadExeTimeHistogram.CalculateIncrease())
-
-	if con.FlushDataCount == 0 && con.ConnectExecuteTime > 0 {
-		traffic.ConnectCounter.IncreaseByValue(0, 1, con.ConnectExecuteTime)
-		traffic.ConnectExeTimeHistogram.IncreaseByValue(con.ConnectExecuteTime)
-	}
-	if con.FlushDataCount == 0 && con.CloseExecuteTime > 0 {
-		traffic.CloseCounter.IncreaseByValue(0, 1, con.CloseExecuteTime)
-		traffic.CloseExeTimeHistogram.IncreaseByValue(con.CloseExecuteTime)
-	}
+	// flush connection metrics
+	traffic.Metrics.FlushMetrics(con)
+
 	con.FlushDataCount++
 	return traffic
 }
@@ -414,7 +430,7 @@ func (t *TrafficAnalyzer) findRemotePidWhenMeshEnvironment(con *ConnectionContex
 }
 
 func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity) uint32 {
-	for _, psList := range t.existingProcesses {
+	for _, psList := range t.analyzeContext.processes {
 		for _, p := range psList {
 			if p.Entity().Layer == layerMeshDP && p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName == entity.InstanceName {
 				name, err := p.ExeName()
diff --git a/pkg/profiling/task/network/analyze/layer4/events.go b/pkg/profiling/task/network/analyze/layer4/events.go
new file mode 100644
index 0000000..641c2fb
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/events.go
@@ -0,0 +1,88 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+	"encoding/json"
+
+	"github.com/apache/skywalking-rover/pkg/logger"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+	"github.com/sirupsen/logrus"
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer4")
+
+// SocketExceptionOperationEvent Socket have been retransmitted/drop the package event
+type SocketExceptionOperationEvent struct {
+	Pid            uint32
+	SocketFamily   uint32
+	RemoteAddrV4   uint32
+	RemoteAddrV6   [16]uint8
+	RemoteAddrPort uint32
+	Type           base.SocketExceptionOperationType
+}
+
+func (l *Listener) handleSocketExceptionOperationEvent(data interface{}) {
+	event := data.(*SocketExceptionOperationEvent)
+	l.socketExceptionOperationLock.Lock()
+	defer l.socketExceptionOperationLock.Unlock()
+
+	key := SocketBasicKey{
+		Pid:          event.Pid,
+		Family:       event.SocketFamily,
+		RemoteAddrV4: event.RemoteAddrV4,
+		RemoteAddrV6: event.RemoteAddrV6,
+		RemotePort:   event.RemoteAddrPort,
+	}
+	exceptionValue := l.socketExceptionStatics[key]
+	if exceptionValue == nil {
+		exceptionValue = &SocketExceptionValue{}
+		l.socketExceptionStatics[key] = exceptionValue
+	}
+
+	switch event.Type {
+	case base.SocketExceptionOperationRetransmit:
+		exceptionValue.RetransmitCount++
+	case base.SocketExceptionOperationDrop:
+		exceptionValue.DropCount++
+	default:
+		log.Warnf("unknown socket exception operation type: %d", event.Type)
+	}
+
+	if log.Enable(logrus.DebugLevel) {
+		marshal, _ := json.Marshal(event)
+		log.Debugf("found socket exception operation event: %s", string(marshal))
+	}
+}
+
+type SocketBasicKey struct {
+	Pid          uint32
+	Family       uint32
+	RemoteAddrV4 uint32
+	RemoteAddrV6 [16]uint8
+	RemotePort   uint32
+	LocalAddrV4  uint32
+	LocalAddrV6  [16]uint8
+	LocalPort    uint32
+}
+
+type SocketExceptionValue struct {
+	DropCount       int
+	RetransmitCount int
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go b/pkg/profiling/task/network/analyze/layer4/listener.go
new file mode 100644
index 0000000..88db514
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+	"fmt"
+	"net"
+	"sync"
+	"unsafe"
+
+	"github.com/sirupsen/logrus"
+
+	"github.com/apache/skywalking-rover/pkg/process/api"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+	"github.com/apache/skywalking-rover/pkg/tools"
+
+	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+	"golang.org/x/sys/unix"
+)
+
+var Name = "layer4"
+
+type Listener struct {
+	// socket retransmit/drop
+	socketExceptionStatics       map[SocketBasicKey]*SocketExceptionValue
+	socketExceptionOperationLock sync.Mutex
+}
+
+func NewListener() *Listener {
+	return &Listener{
+		socketExceptionStatics: make(map[SocketBasicKey]*SocketExceptionValue),
+	}
+}
+
+func (l *Listener) Name() string {
+	return Name
+}
+
+func (l *Listener) GenerateMetrics() base.ListenerMetrics {
+	return NewLayer4Metrics()
+}
+
+func (l *Listener) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+	bpfLoader.ReadEventAsync(bpfLoader.SocketExceptionOperationEventQueue, l.handleSocketExceptionOperationEvent, func() interface{} {
+		return &SocketExceptionOperationEvent{}
+	})
+}
+
+func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event *base.SocketConnectEvent) {
+	// update the connection execute time
+	l.getMetrics(ctx.Metrics).ConnectExecuteTime = event.ExeTime
+}
+
+func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *base.SocketCloseEvent) {
+	layer4 := l.getMetrics(ctx.Metrics)
+	// data transmit counters
+	layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, event.WriteExeTime)
+	layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, event.ReadExeTime)
+	layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, event.WriteRTTExeTime)
+
+	// connection close execute time
+	layer4.CloseExecuteTime = event.ExeTime
+}
+
+func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
+	// rebuild to the map for helping quick search correlate ConnectionContext
+	keyWithContext := make(map[string]*base.ConnectionContext)
+	for _, cc := range ccs {
+		// ready to flush histograms
+		connection := cc.Connection
+		layer4 := l.getMetrics(connection.Metrics)
+		// basic counter update
+		activeConnection := cc.ActiveInBPF
+		if activeConnection != nil {
+			layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
+			layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
+			layer4.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
+		}
+		// build cache
+		keyWithContext[l.generateConID(connection.ConnectionID, connection.RandomID)] = connection
+
+		if log.Enable(logrus.DebugLevel) {
+			log.Debugf("found connection: %d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d bytes/%d, read: %d bytes/%d",
+				connection.ConnectionID, connection.Role.String(),
+				connection.LocalIP, connection.LocalPort, connection.LocalPid, connection.RemoteIP, connection.RemotePort,
+				connection.Protocol.String(), connection.IsSSL, connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
+				layer4.WriteCounter.Cur.Count, layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
+		}
+	}
+
+	var key HistogramDataKey
+	var count uint32
+	histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
+	// for-each the stats map
+	for histogramIt.Next(&key, &count) {
+		// if it's not relate to the ConnectionContext just ignore
+		cc := keyWithContext[l.generateConID(key.ConnectionID, key.RandomID)]
+		if cc == nil {
+			continue
+		}
+		layer4 := l.getMetrics(cc.Metrics)
+
+		// add the histogram data
+		var histogram *SocketDataHistogramWithHistory
+		if key.DataDirection == base.SocketDataDirectionEgress {
+			if key.DataType == base.SocketDataStaticsTypeExeTime {
+				histogram = layer4.WriteExeTimeHistogram
+			} else if key.DataType == base.SocketDataStaticsTypeRTT {
+				histogram = layer4.WriteRTTHistogram
+			}
+		} else if key.DataDirection == base.SocketDataDirectionIngress {
+			histogram = layer4.ReadExeTimeHistogram
+		}
+		if histogram == nil {
+			log.Warnf("unknown the histogram data: %v", cc)
+			continue
+		}
+		histogram.UpdateToCurrent(key.Bucket, count)
+
+		// delete the stats if the connection already closed
+		if cc.ConnectionClosed {
+			if err := bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
+				log.Warnf("delete the connection stats failure: %v", err)
+			}
+		}
+	}
+
+	// all the exception operations to the context
+	exceptionContexts := l.cleanAndGetAllExceptionContexts()
+	l.combineExceptionToConnections(keyWithContext, exceptionContexts)
+	return nil
+}
+
+func (l *Listener) PostFlushConnectionMetrics(ccs []*base.ConnectionContext) {
+	for _, connection := range ccs {
+		metrics := l.getMetrics(connection.Metrics)
+
+		// refresh counters
+		metrics.WriteCounter.RefreshCurrent()
+		metrics.ReadCounter.RefreshCurrent()
+		metrics.WriteRTTCounter.RefreshCurrent()
+		metrics.WriteRTTHistogram.RefreshCurrent()
+		metrics.WriteExeTimeHistogram.RefreshCurrent()
+		metrics.ReadExeTimeHistogram.RefreshCurrent()
+		metrics.ConnectCounter.RefreshCurrent()
+		metrics.CloseCounter.RefreshCurrent()
+		metrics.ConnectExeTimeHistogram.RefreshCurrent()
+		metrics.CloseExeTimeHistogram.RefreshCurrent()
+		metrics.RetransmitCounter.RefreshCurrent()
+		metrics.DropCounter.RefreshCurrent()
+	}
+}
+
+func (l *Listener) FlushMetrics(traffics []*base.ProcessTraffic, builder *base.MetricsBuilder) {
+	l.logTheMetricsConnections(traffics)
+
+	metricsPrefix := builder.MetricPrefix()
+	for _, traffic := range traffics {
+		metrics := traffic.Metrics.GetMetrics(Name).(*Metrics)
+		for _, p := range traffic.LocalProcesses {
+			collection := make([]*v3.MeterData, 0)
+			collection = l.appendCounterValues(collection, metricsPrefix, "write", traffic, p, metrics.WriteCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "read", traffic, p, metrics.ReadCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "connect", traffic, p, metrics.ConnectCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "close", traffic, p, metrics.CloseCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "retransmit", traffic, p, metrics.RetransmitCounter)
+			collection = l.appendCounterValues(collection, metricsPrefix, "drop", traffic, p, metrics.DropCounter)
+
+			collection = l.appendHistogramValue(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTHistogram)
+			collection = l.appendHistogramValue(collection, metricsPrefix, "write_exe_time", traffic, p, metrics.WriteExeTimeHistogram)
+			collection = l.appendHistogramValue(collection, metricsPrefix, "read_exe_time", traffic, p, metrics.ReadExeTimeHistogram)
+			collection = l.appendHistogramValue(collection, metricsPrefix, "connect_exe_time", traffic, p, metrics.ConnectExeTimeHistogram)
+			collection = l.appendHistogramValue(collection, metricsPrefix, "close_exe_time", traffic, p, metrics.CloseExeTimeHistogram)
+
+			if len(collection) == 0 {
+				continue
+			}
+
+			builder.AppendMetrics(p.Entity().ServiceName, p.Entity().InstanceName, collection)
+		}
+	}
+}
+
+func (l *Listener) logTheMetricsConnections(traffics []*base.ProcessTraffic) {
+	if !log.Enable(logrus.DebugLevel) {
+		return
+	}
+	for _, traffic := range traffics {
+		side := traffic.Role.String()
+		layer4 := l.getMetrics(traffic.Metrics)
+		log.Debugf("connection layer4 analyze result: %s : %s, protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
+			side, traffic.GenerateConnectionInfo(), traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, layer4.WriteCounter.Cur.Count,
+			layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
+	}
+}
+
+func (l *Listener) generateConID(conID, randomID uint64) string {
+	return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (l *Listener) cleanAndGetAllExceptionContexts() map[SocketBasicKey]*SocketExceptionValue {
+	l.socketExceptionOperationLock.Lock()
+	defer l.socketExceptionOperationLock.Unlock()
+
+	result := l.socketExceptionStatics
+	l.socketExceptionStatics = make(map[SocketBasicKey]*SocketExceptionValue)
+	return result
+}
+
+func (l *Listener) combineExceptionToConnections(ccs map[string]*base.ConnectionContext, exps map[SocketBasicKey]*SocketExceptionValue) {
+	for key, value := range exps {
+		var remotePort, localPort = uint16(key.RemotePort), uint16(key.LocalPort)
+		var remoteIP, localIP string
+
+		if key.Family == unix.AF_INET {
+			remoteIP = parseAddressV4(key.RemoteAddrV4)
+			localIP = parseAddressV4(key.LocalAddrV4)
+		} else if key.Family == unix.AF_INET6 {
+			remoteIP = parseAddressV6(key.RemoteAddrV6)
+			localIP = parseAddressV6(key.LocalAddrV6)
+		} else {
+			continue
+		}
+
+		var firstRemoteMatch *base.ConnectionContext
+		var foundAllAddrMatch bool
+		for _, cc := range ccs {
+			// only add to the first matches
+			if cc.RemoteIP == remoteIP && cc.RemotePort == remotePort {
+				firstRemoteMatch = cc
+				if cc.LocalIP == localIP && cc.LocalPort == localPort {
+					l.mergeExceptionToAppointConnection(value, cc)
+					foundAllAddrMatch = true
+					break
+				}
+			}
+		}
+
+		// if only remote address match, then just add to the first one
+		if !foundAllAddrMatch && firstRemoteMatch != nil {
+			l.mergeExceptionToAppointConnection(value, firstRemoteMatch)
+		}
+	}
+}
+
+func (l *Listener) mergeExceptionToAppointConnection(expCtx *SocketExceptionValue, conCtx *base.ConnectionContext) {
+	layer4 := l.getMetrics(conCtx.Metrics)
+	layer4.DropCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.DropCount), 0))
+	layer4.RetransmitCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.RetransmitCount), 0))
+}
+
+func (l *Listener) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
+	local api.ProcessInterface, counter *SocketDataCounterWithHistory) []*v3.MeterData {
+	metric := counter.Cur
+	if !metric.NotEmpty() {
+		return metrics
+	}
+
+	count := float64(metric.Count)
+	metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_counts_counter", traffic, local, count))
+	if metric.Bytes > 0 {
+		metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_bytes_counter", traffic, local, float64(metric.Bytes)))
+	}
+	if metric.ExeTime > 0 {
+		metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_exe_time_counter", traffic, local, float64(metric.ExeTime)/count))
+	}
+	return metrics
+}
+
+func (l *Listener) appendHistogramValue(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
+	local api.ProcessInterface, histogram *SocketDataHistogramWithHistory) []*v3.MeterData {
+	data := histogram.Cur
+	if !data.NotEmpty() {
+		return metrics
+	}
+
+	role, labels := l.buildBasicMeterLabels(traffic, local)
+	values := make([]*v3.MeterBucketValue, 0)
+	for bucket, count := range data.Buckets {
+		var bucketInx = int(bucket)
+		if bucketInx >= SocketHistogramBucketsCount {
+			bucketInx = SocketHistogramBucketsCount - 1
+		}
+		var buckets []float64
+		if data.Unit == HistogramDataUnitUS {
+			buckets = SocketHistogramBucketsUs
+		} else {
+			buckets = SocketHistogramBucketsNs
+		}
+		values = append(values, &v3.MeterBucketValue{
+			Bucket: buckets[bucketInx],
+			Count:  int64(count),
+		})
+	}
+
+	return append(metrics, &v3.MeterData{
+		Metric: &v3.MeterData_Histogram{
+			Histogram: &v3.MeterHistogram{
+				Name:   fmt.Sprintf("%s%s_%s_histogram", metricsPrefix, role.String(), name),
+				Labels: labels,
+				Values: values,
+			},
+		},
+	})
+}
+
+func (l *Listener) buildSingleValue(prefix, name string, traffic *base.ProcessTraffic, local api.ProcessInterface, val float64) *v3.MeterData {
+	role, labels := l.buildBasicMeterLabels(traffic, local)
+
+	return &v3.MeterData{
+		Metric: &v3.MeterData_SingleValue{
+			SingleValue: &v3.MeterSingleValue{
+				Name:   fmt.Sprintf("%s%s_%s", prefix, role.String(), name),
+				Labels: labels,
+				Value:  val,
+			},
+		},
+	}
+}
+
+func (l *Listener) buildBasicMeterLabels(traffic *base.ProcessTraffic, local api.ProcessInterface) (base.ConnectionRole, []*v3.Label) {
+	curRole := traffic.Role
+	// add the default role
+	if curRole == base.ConnectionRoleUnknown {
+		curRole = base.ConnectionRoleClient
+	}
+	labels := make([]*v3.Label, 0)
+
+	// two pair process/address info
+	labels = l.appendMeterValue(labels, fmt.Sprintf("%s_process_id", curRole.String()), local.ID())
+	labels = l.appendRemoteAddressInfo(labels, traffic, curRole.Revert().String(), local)
+
+	labels = l.appendMeterValue(labels, "side", curRole.String())
+
+	// protocol and ssl
+	labels = l.appendMeterValue(labels, "protocol", traffic.Protocol.String())
+	labels = l.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", traffic.IsSSL))
+	return curRole, labels
+}
+
+func (l *Listener) appendRemoteAddressInfo(labels []*v3.Label, traffic *base.ProcessTraffic, prefix string, local api.ProcessInterface) []*v3.Label {
+	if len(traffic.RemoteProcesses) != 0 {
+		for _, p := range traffic.RemoteProcesses {
+			// only match with same service instance
+			if local.Entity().ServiceName == p.Entity().ServiceName &&
+				local.Entity().InstanceName == p.Entity().InstanceName {
+				return l.appendMeterValue(labels, prefix+"_process_id", p.ID())
+			}
+		}
+	}
+
+	if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
+		return l.appendMeterValue(labels, prefix+"_local", "true")
+	}
+
+	return l.appendMeterValue(labels, prefix+"_address", fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort))
+}
+
+func (l *Listener) appendMeterValue(labels []*v3.Label, name, value string) []*v3.Label {
+	return append(labels, &v3.Label{
+		Name:  name,
+		Value: value,
+	})
+}
+
+func (l *Listener) getMetrics(connectionMetrics *base.ConnectionMetrics) *Metrics {
+	return connectionMetrics.GetMetrics(Name).(*Metrics)
+}
+
+type HistogramDataKey struct {
+	ConnectionID  uint64
+	RandomID      uint64
+	DataDirection base.SocketDataDirection
+	DataType      base.SocketDataStaticsType
+	Bucket        uint64
+}
+
+func parseAddressV4(val uint32) string {
+	return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+	return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/metrics.go b/pkg/profiling/task/network/analyze/layer4/metrics.go
new file mode 100644
index 0000000..f6a6658
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/metrics.go
@@ -0,0 +1,276 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+type Metrics struct {
+	// basic statics
+	// read/write
+	WriteCounter *SocketDataCounterWithHistory
+	ReadCounter  *SocketDataCounterWithHistory
+	// write RTT
+	WriteRTTCounter *SocketDataCounterWithHistory
+
+	// histograms
+	// write execute time and RTT
+	WriteRTTHistogram     *SocketDataHistogramWithHistory
+	WriteExeTimeHistogram *SocketDataHistogramWithHistory
+	// read execute time
+	ReadExeTimeHistogram *SocketDataHistogramWithHistory
+
+	// the connection connect or close execute time
+	ConnectExecuteTime      uint64
+	CloseExecuteTime        uint64
+	ConnectCounter          *SocketDataCounterWithHistory
+	CloseCounter            *SocketDataCounterWithHistory
+	ConnectExeTimeHistogram *SocketDataHistogramWithHistory
+	CloseExeTimeHistogram   *SocketDataHistogramWithHistory
+
+	// exception counters
+	RetransmitCounter *SocketDataCounterWithHistory
+	DropCounter       *SocketDataCounterWithHistory
+}
+
+func NewLayer4Metrics() *Metrics {
+	return &Metrics{
+		WriteCounter:            NewSocketDataCounterWithHistory(),
+		ReadCounter:             NewSocketDataCounterWithHistory(),
+		WriteRTTCounter:         NewSocketDataCounterWithHistory(),
+		WriteRTTHistogram:       NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
+		WriteExeTimeHistogram:   NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+		ReadExeTimeHistogram:    NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+		ConnectCounter:          NewSocketDataCounterWithHistory(),
+		ConnectExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+		CloseCounter:            NewSocketDataCounterWithHistory(),
+		CloseExeTimeHistogram:   NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+		RetransmitCounter:       NewSocketDataCounterWithHistory(),
+		DropCounter:             NewSocketDataCounterWithHistory(),
+	}
+}
+
+func (l *Metrics) FlushMetrics(connection *base.ConnectionContext) {
+	metrics := connection.Metrics.GetMetrics(Name).(*Metrics)
+
+	l.WriteCounter.IncreaseToCurrent(metrics.WriteCounter.CalculateIncrease())
+	l.ReadCounter.IncreaseToCurrent(metrics.ReadCounter.CalculateIncrease())
+	l.WriteRTTCounter.IncreaseToCurrent(metrics.WriteRTTCounter.CalculateIncrease())
+
+	l.WriteRTTHistogram.IncreaseToCurrent(metrics.WriteRTTHistogram.CalculateIncrease())
+	l.WriteExeTimeHistogram.IncreaseToCurrent(metrics.WriteExeTimeHistogram.CalculateIncrease())
+	l.ReadExeTimeHistogram.IncreaseToCurrent(metrics.ReadExeTimeHistogram.CalculateIncrease())
+
+	l.RetransmitCounter.IncreaseToCurrent(metrics.RetransmitCounter.CalculateIncrease())
+	l.DropCounter.IncreaseToCurrent(metrics.DropCounter.CalculateIncrease())
+
+	if connection.FlushDataCount == 0 && metrics.ConnectExecuteTime > 0 {
+		l.ConnectCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, metrics.ConnectExecuteTime))
+		l.ConnectExeTimeHistogram.Cur.IncreaseByValue(metrics.ConnectExecuteTime)
+	}
+	if connection.FlushDataCount == 0 && metrics.CloseExecuteTime > 0 {
+		l.CloseCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, metrics.CloseExecuteTime))
+		l.CloseExeTimeHistogram.Cur.IncreaseByValue(metrics.CloseExecuteTime)
+	}
+}
+
+type SocketDataCounter struct {
+	Bytes   uint64
+	Count   uint64
+	ExeTime uint64
+}
+
+func NewSocketDataCounter() *SocketDataCounter {
+	return &SocketDataCounter{}
+}
+
+func NewSocketDataCounterWithValue(bytes, count, exeTime uint64) *SocketDataCounter {
+	ret := &SocketDataCounter{}
+	ret.IncreaseByValue(bytes, count, exeTime)
+	return ret
+}
+
+func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
+	s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
+}
+
+func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
+	s.Bytes += bytes
+	s.Count += count
+	s.ExeTime += exeTime
+}
+
+func (s *SocketDataCounter) NotEmpty() bool {
+	return s.Count > 0
+}
+
+// SocketDataCounterWithHistory means the socket send/receive data metrics
+type SocketDataCounterWithHistory struct {
+	Pre *SocketDataCounter
+	Cur *SocketDataCounter
+}
+
+func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
+	return &SocketDataCounterWithHistory{
+		Pre: NewSocketDataCounter(),
+		Cur: NewSocketDataCounter(),
+	}
+}
+
+func (c *SocketDataCounterWithHistory) RefreshCurrent() {
+	c.Pre = c.Cur
+	c.Cur = NewSocketDataCounterWithValue(c.Cur.Bytes, c.Cur.Count, c.Cur.ExeTime)
+}
+
+func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime uint64) {
+	c.Pre = c.Cur
+	c.Cur = &SocketDataCounter{
+		Bytes:   bytes,
+		Count:   count,
+		ExeTime: exeTime,
+	}
+}
+
+func (c *SocketDataCounterWithHistory) IncreaseToCurrent(other *SocketDataCounter) {
+	c.Cur.Increase(other)
+}
+
+func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
+	return &SocketDataCounter{
+		Bytes:   subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
+		Count:   subtractionValue(c.Cur.Count, c.Pre.Count),
+		ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
+	}
+}
+
+// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
+// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 50ms, 70ms, 100ms, 150ms,
+// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
+// value unit: ns
+var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 1000000, 1200000, 1500000, 1700000, 2000000,
+	2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 20000000, 25000000, 30000000, 35000000, 40000000,
+	45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 300000000, 500000000, 1000000000, 2000000000,
+	3000000000, 5000000000}
+
+// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value unit: us
+var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 1500, 1700, 2000,
+	2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 35000, 40000,
+	45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 2000000,
+	3000000, 5000000}
+var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
+
+type SocketDataHistogram struct {
+	Unit    HistogramDataUnit
+	Buckets map[uint64]uint32
+}
+
+func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
+	for k, v := range other.Buckets {
+		h.Buckets[k] = v
+	}
+}
+
+func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
+	h.Buckets[bucket] = value
+}
+
+func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
+	for k, v := range other.Buckets {
+		h.Buckets[k] += v
+	}
+}
+
+func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
+	floatVal := float64(val)
+	for inx, curVal := range SocketHistogramBucketsNs {
+		if inx > 0 && curVal > floatVal {
+			h.Buckets[uint64(inx-1)]++
+			return
+		}
+	}
+	h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
+}
+
+func (h *SocketDataHistogram) NotEmpty() bool {
+	for _, v := range h.Buckets {
+		if v > 0 {
+			return true
+		}
+	}
+	return false
+}
+
+func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
+	buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
+	for i := 0; i < SocketHistogramBucketsCount; i++ {
+		buckets[uint64(i)] = 0
+	}
+	return &SocketDataHistogram{
+		Unit:    unit,
+		Buckets: buckets,
+	}
+}
+
+type HistogramDataUnit int
+
+const (
+	HistogramDataUnitNS HistogramDataUnit = 1
+	HistogramDataUnitUS HistogramDataUnit = 2
+)
+
+type SocketDataHistogramWithHistory struct {
+	Pre *SocketDataHistogram
+	Cur *SocketDataHistogram
+}
+
+func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) *SocketDataHistogramWithHistory {
+	return &SocketDataHistogramWithHistory{
+		Pre: NewSocketDataHistogram(unit),
+		Cur: NewSocketDataHistogram(unit),
+	}
+}
+
+func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
+	// storage the current value to the previous buckets
+	h.Pre.Overwrite(h.Cur)
+}
+
+func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val uint32) {
+	h.Cur.Update(bucket, val)
+}
+
+func (h *SocketDataHistogramWithHistory) IncreaseToCurrent(other *SocketDataHistogram) {
+	h.Cur.Increase(other)
+}
+
+func (h *SocketDataHistogramWithHistory) CalculateIncrease() *SocketDataHistogram {
+	histogram := NewSocketDataHistogram(h.Cur.Unit)
+	var increaseVal uint32
+	for curK, curV := range h.Cur.Buckets {
+		if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
+			histogram.Buckets[curK] = increaseVal
+		}
+	}
+	return histogram
+}
+
+func subtractionValue(v1, v2 uint64) uint64 {
+	if v1 > v2 {
+		return v1 - v2
+	}
+	return 0
+}
diff --git a/pkg/profiling/task/network/bpf/bpf.go b/pkg/profiling/task/network/bpf/bpf.go
new file mode 100644
index 0000000..2181f09
--- /dev/null
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -0,0 +1,56 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bpf
+
+import (
+	"github.com/apache/skywalking-rover/pkg/tools/btf"
+
+	"github.com/hashicorp/go-multierror"
+)
+
+// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
+// nolint
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
+
+type Loader struct {
+	*Linker
+	*bpfObjects
+}
+
+func NewLoader() (*Loader, error) {
+	objs := bpfObjects{}
+	if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil {
+		return nil, err
+	}
+
+	return &Loader{
+		bpfObjects: &objs,
+		Linker:     NewLinker(),
+	}, nil
+}
+
+func (l *Loader) Close() error {
+	var err error
+	if e := l.bpfObjects.Close(); e != nil {
+		err = multierror.Append(err, e)
+	}
+	if e := l.Linker.Close(); e != nil {
+		err = multierror.Append(err, e)
+	}
+	return err
+}
diff --git a/pkg/profiling/task/network/linker.go b/pkg/profiling/task/network/bpf/linker.go
similarity index 98%
rename from pkg/profiling/task/network/linker.go
rename to pkg/profiling/task/network/bpf/linker.go
index c8cdf1e..7664918 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/bpf/linker.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package network
+package bpf
 
 import (
 	"bytes"
@@ -28,6 +28,7 @@ import (
 
 	"golang.org/x/arch/x86/x86asm"
 
+	"github.com/apache/skywalking-rover/pkg/logger"
 	"github.com/apache/skywalking-rover/pkg/tools"
 	"github.com/apache/skywalking-rover/pkg/tools/elf"
 
@@ -38,6 +39,8 @@ import (
 	"github.com/hashicorp/go-multierror"
 )
 
+var log = logger.GetLogger("profiling", "task", "network", "bpf")
+
 const defaultSymbolPrefix = "sys_"
 
 type LinkFunc func(symbol string, prog *ebpf.Program) (link.Link, error)
diff --git a/pkg/profiling/task/network/context.go b/pkg/profiling/task/network/context.go
deleted file mode 100644
index 3ebc2eb..0000000
--- a/pkg/profiling/task/network/context.go
+++ /dev/null
@@ -1,726 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
-	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"net"
-	"sync"
-	"unsafe"
-
-	cmap "github.com/orcaman/concurrent-map"
-
-	"github.com/sirupsen/logrus"
-
-	"github.com/hashicorp/go-multierror"
-
-	"github.com/cilium/ebpf"
-
-	"golang.org/x/sys/unix"
-
-	"github.com/apache/skywalking-rover/pkg/process/api"
-)
-
-type Context struct {
-	processes map[int32][]api.ProcessInterface
-
-	bpf    *bpfObjects // current bpf programs
-	linker *Linker
-
-	// standard syscall connections
-	activeConnections cmap.ConcurrentMap      // current activeConnections connections
-	closedConnections []*ConnectionContext    // closed connections'
-	flushClosedEvents chan *SocketCloseEvent  // connection have been closed, it is a queue to cache unknown active connections
-	sockParseQueue    chan *ConnectionContext // socket address parse queue
-
-	// socket retransmit/drop
-	socketExceptionStatics       map[SocketBasicKey]*SocketExceptionValue
-	socketExceptionOperationLock sync.Mutex
-}
-
-type SocketBasicKey struct {
-	Pid          uint32
-	Family       uint32
-	RemoteAddrV4 uint32
-	RemoteAddrV6 [16]uint8
-	RemotePort   uint32
-	LocalAddrV4  uint32
-	LocalAddrV6  [16]uint8
-	LocalPort    uint32
-}
-
-type SocketExceptionValue struct {
-	DropCount       int
-	RetransmitCount int
-}
-
-type ConnectionContext struct {
-	// basic metadata
-	ConnectionID     uint64
-	RandomID         uint64
-	LocalPid         uint32
-	SocketFD         uint32
-	LocalProcesses   []api.ProcessInterface
-	ConnectionClosed bool
-	Protocol         ConnectionProtocol
-	IsSSL            bool
-
-	// socket metadata
-	Role       ConnectionRole
-	LocalIP    string
-	LocalPort  uint16
-	RemoteIP   string
-	RemotePort uint16
-
-	// basic statics
-	// read/write
-	WriteCounter *SocketDataCounterWithHistory
-	ReadCounter  *SocketDataCounterWithHistory
-	// write RTT
-	WriteRTTCounter *SocketDataCounterWithHistory
-
-	// histograms
-	// write execute time and RTT
-	WriteRTTHistogram     *SocketDataHistogramWithHistory
-	WriteExeTimeHistogram *SocketDataHistogramWithHistory
-	// read execute time
-	ReadExeTimeHistogram *SocketDataHistogramWithHistory
-
-	// the connection connect or close execute time
-	ConnectExecuteTime uint64
-	CloseExecuteTime   uint64
-
-	// exception counters
-	RetransmitCounter *SocketDataCounter
-	DropCounter       *SocketDataCounter
-
-	// Flush the data content to the oap count
-	FlushDataCount int
-}
-
-func NewContext() *Context {
-	return &Context{
-		activeConnections:      cmap.New(),
-		closedConnections:      make([]*ConnectionContext, 0),
-		flushClosedEvents:      make(chan *SocketCloseEvent, 5000),
-		sockParseQueue:         make(chan *ConnectionContext, 5000),
-		processes:              make(map[int32][]api.ProcessInterface),
-		socketExceptionStatics: make(map[SocketBasicKey]*SocketExceptionValue),
-	}
-}
-
-func (c *Context) Init(bpf *bpfObjects, linker *Linker) {
-	c.bpf = bpf
-	c.linker = linker
-}
-
-func (c *Context) RegisterAllHandlers() {
-	// socket connect
-	c.linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
-		return &SocketConnectEvent{}
-	})
-	// socket close
-	c.linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
-		return &SocketCloseEvent{}
-	})
-	// socket retransmit
-	c.linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, c.handleSocketExceptionOperationEvent, func() interface{} {
-		return &SocketExceptionOperationEvent{}
-	})
-}
-
-func (c *Context) FlushAllConnection() ([]*ConnectionContext, error) {
-	// handling the unfinished close event
-	c.batchReProcessCachedCloseEvent()
-
-	// get all connection context and fill the metrics
-	allContexts := c.getAllConnectionWithContext()
-	c.fillConnectionMetrics(allContexts)
-
-	// all the exception operations to the context
-	exceptionContexts := c.cleanAndGetAllExceptionContexts()
-	// init all exception counters
-	for _, ctx := range allContexts {
-		ctx.DropCounter = NewSocketDataCounter()
-		ctx.RetransmitCounter = NewSocketDataCounter()
-	}
-	c.combineExceptionToConnections(allContexts, exceptionContexts)
-
-	return allContexts, nil
-}
-
-func (c *Context) StartSocketAddressParser(ctx context.Context) {
-	for i := 0; i < 2; i++ {
-		go c.handleSocketParseQueue(ctx)
-	}
-}
-
-func (c *Context) handleSocketParseQueue(ctx context.Context) {
-	for {
-		select {
-		case cc := <-c.sockParseQueue:
-			socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
-			if err != nil {
-				// if the remote port of connection is empty, then this connection not available basically
-				if cc.RemotePort == 0 {
-					log.Warnf("complete the socket error, pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
-				}
-				continue
-			}
-			cc.LocalIP = socket.SrcIP
-			cc.LocalPort = socket.SrcPort
-			cc.RemoteIP = socket.DestIP
-			cc.RemotePort = socket.DestPort
-		case <-ctx.Done():
-			return
-		}
-	}
-}
-
-func (c *Context) combineExceptionToConnections(ccs []*ConnectionContext, exps map[SocketBasicKey]*SocketExceptionValue) {
-	for key, value := range exps {
-		var remotePort, localPort = uint16(key.RemotePort), uint16(key.LocalPort)
-		var remoteIP, localIP string
-
-		if key.Family == unix.AF_INET {
-			remoteIP = parseAddressV4(key.RemoteAddrV4)
-			localIP = parseAddressV4(key.LocalAddrV4)
-		} else if key.Family == unix.AF_INET6 {
-			remoteIP = parseAddressV6(key.RemoteAddrV6)
-			localIP = parseAddressV6(key.LocalAddrV6)
-		} else {
-			continue
-		}
-
-		var firstRemoteMatch *ConnectionContext
-		var foundAllAddrMatch bool
-		for _, cc := range ccs {
-			// only add to the first matches
-			if cc.RemoteIP == remoteIP && cc.RemotePort == remotePort {
-				firstRemoteMatch = cc
-				if cc.LocalIP == localIP && cc.LocalPort == localPort {
-					c.mergeExceptionToAppointConnection(value, cc)
-					foundAllAddrMatch = true
-					break
-				}
-			}
-		}
-
-		// if only remote address match, then just add to the first one
-		if !foundAllAddrMatch && firstRemoteMatch != nil {
-			c.mergeExceptionToAppointConnection(value, firstRemoteMatch)
-		}
-	}
-}
-
-func (c *Context) mergeExceptionToAppointConnection(expCtx *SocketExceptionValue, conCtx *ConnectionContext) {
-	conCtx.DropCounter.IncreaseByValue(0, uint64(expCtx.DropCount), 0)
-	conCtx.RetransmitCounter.IncreaseByValue(0, uint64(expCtx.RetransmitCount), 0)
-}
-
-func (c *Context) cleanAndGetAllExceptionContexts() map[SocketBasicKey]*SocketExceptionValue {
-	c.socketExceptionOperationLock.Lock()
-	defer c.socketExceptionOperationLock.Unlock()
-
-	result := c.socketExceptionStatics
-	c.socketExceptionStatics = make(map[SocketBasicKey]*SocketExceptionValue)
-	return result
-}
-
-func (c *Context) getAllConnectionWithContext() []*ConnectionContext {
-	result := make([]*ConnectionContext, 0)
-	result = append(result, c.closedConnections...)
-	for _, con := range c.activeConnections.Items() {
-		result = append(result, con.(*ConnectionContext))
-	}
-
-	c.closedConnections = make([]*ConnectionContext, 0)
-	return result
-}
-
-type ActiveConnectionInBPF struct {
-	RandomID     uint64
-	Pid          uint32
-	SocketFD     uint32
-	Role         ConnectionRole
-	SocketFamily uint32
-
-	RemoteAddrV4   uint32
-	RemoteAddrV6   [16]uint8
-	RemoteAddrPort uint32
-	LocalAddrV4    uint32
-	LocalAddrV6    [16]uint8
-	LocalAddrPort  uint32
-
-	WriteBytes   uint64
-	WriteCount   uint64
-	WriteExeTime uint64
-	ReadBytes    uint64
-	ReadCount    uint64
-	ReadExeTime  uint64
-
-	WriteRTTCount   uint64
-	WriteRTTExeTime uint64
-
-	// Protocol analyze context
-	Protocol ConnectionProtocol
-	IsSSL    uint32
-
-	// the connect event is already sent
-	ConnectEventIsSent uint32
-}
-
-type HistogramDataKey struct {
-	ConnectionID  uint64
-	RandomID      uint64
-	DataDirection SocketDataDirection
-	DataType      SocketDataStaticsType
-	Bucket        uint64
-}
-
-func (c *Context) fillConnectionMetrics(ccs []*ConnectionContext) {
-	// rebuild to the map for helping quick search correlate ConnectionContext
-	keyWithContext := make(map[string]*ConnectionContext)
-	var activeConnection ActiveConnectionInBPF
-	closedConns := make([]string, 0)
-	for _, cc := range ccs {
-		connectionKey := c.generateConnectionKey(cc.ConnectionID, cc.RandomID)
-		// refresh the histogram for prepare to update the buckets
-		cc.WriteRTTHistogram.RefreshCurrent()
-		cc.WriteExeTimeHistogram.RefreshCurrent()
-		cc.ReadExeTimeHistogram.RefreshCurrent()
-		keyWithContext[connectionKey] = cc
-
-		// if connection not closed, then load the basic stats from bpf map
-		if !cc.ConnectionClosed {
-			err := c.bpf.ActiveConnectionMap.Lookup(cc.ConnectionID, &activeConnection)
-
-			if err != nil {
-				if errors.Is(err, ebpf.ErrKeyNotExist) {
-					closedConns = append(closedConns, connectionKey)
-				} else {
-					log.Warnf("lookup the active connection error, connection id: %d, error: %v", cc.ConnectionID, err)
-				}
-				continue
-			}
-
-			if log.Enable(logrus.DebugLevel) {
-				marshal, _ := json.Marshal(activeConnection)
-				log.Debugf("found the active connection, conid: %d, data: %s", cc.ConnectionID, string(marshal))
-			}
-
-			if cc.Role == ConnectionRoleUnknown && activeConnection.Role != ConnectionRoleUnknown {
-				cc.Role = activeConnection.Role
-			}
-			if cc.Protocol == ConnectionProtocolUnknown && activeConnection.Protocol != ConnectionProtocolUnknown {
-				cc.Protocol = activeConnection.Protocol
-			}
-			if !cc.IsSSL && activeConnection.IsSSL == 1 {
-				cc.IsSSL = true
-			}
-
-			// update the role
-			cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
-			cc.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
-			cc.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
-		}
-	}
-	if len(closedConns) > 0 {
-		c.deleteConnectionOnly(closedConns)
-	}
-
-	// fill the histogram metrics
-	c.fillHistograms(keyWithContext)
-}
-
-func (c *Context) fillHistograms(keyWithContext map[string]*ConnectionContext) {
-	var key HistogramDataKey
-	var count uint32
-	histogramIt := c.bpf.SocketConnectionStatsHistogram.Iterate()
-	// for-each the stats map
-	for histogramIt.Next(&key, &count) {
-		// if it's not relate to the ConnectionContext just ignore
-		cc := keyWithContext[c.generateConnectionKey(key.ConnectionID, key.RandomID)]
-		if cc == nil {
-			continue
-		}
-
-		// add the histogram data
-		var histogram *SocketDataHistogramWithHistory
-		if key.DataDirection == SocketDataDirectionEgress {
-			if key.DataType == SocketDataStaticsTypeExeTime {
-				histogram = cc.WriteExeTimeHistogram
-			} else if key.DataType == SocketDataStaticsTypeRTT {
-				histogram = cc.WriteRTTHistogram
-			}
-		} else if key.DataDirection == SocketDataDirectionIngress {
-			histogram = cc.ReadExeTimeHistogram
-		}
-		if histogram == nil {
-			log.Warnf("unknown the histogram data: %v", cc)
-			continue
-		}
-		histogram.UpdateToCurrent(key.Bucket, count)
-
-		// delete the stats if the connection already closed
-		if cc.ConnectionClosed {
-			if err := c.bpf.SocketConnectionStatsHistogram.Delete(key); err != nil {
-				log.Warnf("delete the connection stats failure: %v", err)
-			}
-		}
-	}
-}
-
-// SocketConnectEvent Socket have been connection/accept event
-type SocketConnectEvent struct {
-	ConID        uint64
-	RandomID     uint64
-	ExeTime      uint64
-	NeedComplete uint32
-	Pid          uint32
-	FD           uint32
-	FuncName     uint32
-
-	// socket information if exists
-	Role           ConnectionRole
-	SocketFamily   uint32
-	RemoteAddrV4   uint32
-	RemoteAddrV6   [16]uint8
-	RemoteAddrPort uint32
-	LocalAddrV4    uint32
-	LocalAddrV6    [16]uint8
-	LocalAddrPort  uint32
-}
-
-func (c *Context) handleSocketConnectEvent(data interface{}) {
-	event := data.(*SocketConnectEvent)
-	processes := c.processes[int32(event.Pid)]
-	if len(processes) == 0 {
-		log.Warnf("get process connect event, but this process is don't need to monitor, pid: %d", event.Pid)
-		return
-	}
-
-	// build active connection information
-	con := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, event.FD, processes, false)
-	con.ConnectExecuteTime = event.ExeTime
-	con.Role = event.Role
-	if event.NeedComplete == 0 {
-		con.RemotePort = uint16(event.RemoteAddrPort)
-		con.LocalPort = uint16(event.LocalAddrPort)
-		if event.SocketFamily == unix.AF_INET {
-			con.LocalIP = parseAddressV4(event.LocalAddrV4)
-			con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
-		} else {
-			con.LocalIP = parseAddressV6(event.LocalAddrV6)
-			con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
-		}
-	} else {
-		// if the remote address exists then setting it
-		if event.RemoteAddrPort != 0 {
-			con.RemotePort = uint16(event.RemoteAddrPort)
-			if event.SocketFamily == unix.AF_INET {
-				con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
-			} else {
-				con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
-			}
-		}
-		c.sockParseQueue <- con
-	}
-
-	// add to the context
-	c.saveActiveConnection(con)
-
-	if log.Enable(logrus.DebugLevel) {
-		marshal, _ := json.Marshal(event)
-		log.Debugf("found connect event: role: %s, %s:%d:%d -> %s:%d, json: %s", con.Role.String(),
-			con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, con.RemotePort, string(marshal))
-	}
-}
-
-func (c *Context) saveActiveConnection(con *ConnectionContext) {
-	c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, con.RandomID), con)
-}
-
-type SocketCloseEvent struct {
-	ConID    uint64
-	RandomID uint64
-	ExeTime  uint64
-	Pid      uint32
-	SocketFD uint32
-	Role     ConnectionRole
-	Protocol ConnectionProtocol
-	IsSSL    uint32
-	Fix      uint32
-
-	SocketFamily   uint32
-	RemoteAddrV4   uint32
-	RemoteAddrV6   [16]uint8
-	RemoteAddrPort uint32
-	LocalAddrV4    uint32
-	LocalAddrV6    [16]uint8
-	LocalAddrPort  uint32
-	Fix1           uint32
-
-	WriteBytes   uint64
-	WriteCount   uint64
-	WriteExeTime uint64
-	ReadBytes    uint64
-	ReadCount    uint64
-	ReadExeTime  uint64
-
-	WriteRTTCount   uint64
-	WriteRTTExeTime uint64
-}
-
-// batch to re-process all cached closed event
-func (c *Context) batchReProcessCachedCloseEvent() {
-	for len(c.flushClosedEvents) > 0 {
-		event := <-c.flushClosedEvents
-		if !c.socketClosedEvent0(event) {
-			// if cannot the found the active connection, then just create a new closed connection context
-			processes := c.processes[int32(event.Pid)]
-			if len(processes) == 0 {
-				continue
-			}
-			cc := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, event.SocketFD, processes, true)
-			if event.SocketFamily == unix.AF_INET {
-				cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
-				cc.LocalIP = parseAddressV4(event.LocalAddrV4)
-			} else if event.SocketFamily == unix.AF_INET6 {
-				cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
-				cc.LocalIP = parseAddressV6(event.LocalAddrV6)
-			} else {
-				continue
-			}
-
-			// append to the closed connection
-			c.closedConnections = append(c.closedConnections, c.combineClosedConnection(cc, event))
-		}
-	}
-}
-
-func (c *Context) newConnectionContext(conID, randomID uint64, pid, fd uint32, processes []api.ProcessInterface, conClosed bool) *ConnectionContext {
-	return &ConnectionContext{
-		// metadata
-		ConnectionID:     conID,
-		RandomID:         randomID,
-		LocalPid:         pid,
-		SocketFD:         fd,
-		LocalProcesses:   processes,
-		ConnectionClosed: conClosed,
-
-		// metrics
-		WriteCounter:          NewSocketDataCounterWithHistory(),
-		ReadCounter:           NewSocketDataCounterWithHistory(),
-		WriteRTTCounter:       NewSocketDataCounterWithHistory(),
-		WriteRTTHistogram:     NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
-		WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
-		ReadExeTimeHistogram:  NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
-	}
-}
-
-func (c *Context) handleSocketCloseEvent(data interface{}) {
-	event := data.(*SocketCloseEvent)
-
-	if log.Enable(logrus.DebugLevel) {
-		marshal, _ := json.Marshal(event)
-		log.Debugf("found close event: %s", string(marshal))
-	}
-
-	// try to handle the socket close event
-	if !c.socketClosedEvent0(event) {
-		// is not in active connection, maybe it's not have been added to activate first
-		// just add to the close queue, wait for the flush connection with interval
-		c.flushClosedEvents <- event
-		return
-	}
-}
-
-// SocketExceptionOperationEvent Socket have been retransmitted/drop the package event
-type SocketExceptionOperationEvent struct {
-	Pid            uint32
-	SocketFamily   uint32
-	RemoteAddrV4   uint32
-	RemoteAddrV6   [16]uint8
-	RemoteAddrPort uint32
-	Type           SocketExceptionOperationType
-}
-
-func (c *Context) handleSocketExceptionOperationEvent(data interface{}) {
-	event := data.(*SocketExceptionOperationEvent)
-	c.socketExceptionOperationLock.Lock()
-	defer c.socketExceptionOperationLock.Unlock()
-
-	key := SocketBasicKey{
-		Pid:          event.Pid,
-		Family:       event.SocketFamily,
-		RemoteAddrV4: event.RemoteAddrV4,
-		RemoteAddrV6: event.RemoteAddrV6,
-		RemotePort:   event.RemoteAddrPort,
-	}
-	exceptionValue := c.socketExceptionStatics[key]
-	if exceptionValue == nil {
-		exceptionValue = &SocketExceptionValue{}
-		c.socketExceptionStatics[key] = exceptionValue
-	}
-
-	switch event.Type {
-	case SocketExceptionOperationRetransmit:
-		exceptionValue.RetransmitCount++
-	case SocketExceptionOperationDrop:
-		exceptionValue.DropCount++
-	default:
-		log.Warnf("unknown socket exception operation type: %d", event.Type)
-	}
-
-	if log.Enable(logrus.DebugLevel) {
-		marshal, _ := json.Marshal(event)
-		log.Debugf("found socket exception operation event: %s", string(marshal))
-	}
-}
-
-func (c *Context) socketClosedEvent0(event *SocketCloseEvent) bool {
-	activeCon := c.foundAndDeleteConnection(event)
-	if activeCon == nil {
-		return false
-	}
-
-	// combine the connection data
-	c.closedConnections = append(c.closedConnections, c.combineClosedConnection(activeCon, event))
-	return true
-}
-
-func (c *Context) foundAndDeleteConnection(event *SocketCloseEvent) *ConnectionContext {
-	conKey := c.generateConnectionKey(event.ConID, event.RandomID)
-	val, exists := c.activeConnections.Pop(conKey)
-	if !exists {
-		return nil
-	}
-	return val.(*ConnectionContext)
-}
-
-func (c *Context) deleteConnectionOnly(ccs []string) {
-	for _, cc := range ccs {
-		c.activeConnections.Remove(cc)
-	}
-}
-
-func (c *Context) combineClosedConnection(active *ConnectionContext, closed *SocketCloseEvent) *ConnectionContext {
-	active.ConnectionClosed = true
-
-	if active.Role == ConnectionRoleUnknown && closed.Role != ConnectionRoleUnknown {
-		active.Role = closed.Role
-	}
-	if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != ConnectionProtocolUnknown {
-		active.Protocol = closed.Protocol
-	}
-	if !active.IsSSL && closed.IsSSL == 1 {
-		active.IsSSL = true
-	}
-
-	active.WriteCounter.UpdateToCurrent(closed.WriteBytes, closed.WriteCount, closed.WriteExeTime)
-	active.ReadCounter.UpdateToCurrent(closed.ReadBytes, closed.ReadCount, closed.ReadExeTime)
-	active.WriteRTTCounter.UpdateToCurrent(0, closed.WriteRTTCount, closed.WriteRTTExeTime)
-	active.CloseExecuteTime = closed.ExeTime
-	return active
-}
-
-func (c *Context) generateConnectionKey(conID, randomID uint64) string {
-	return fmt.Sprintf("%d_%d", conID, randomID)
-}
-
-func (c *Context) AddProcesses(processes []api.ProcessInterface) error {
-	var err error
-	for _, p := range processes {
-		pid := p.Pid()
-		alreadyExists := false
-		if len(c.processes[pid]) > 0 {
-			for _, existsProcess := range c.processes[pid] {
-				if p.ID() == existsProcess.ID() {
-					alreadyExists = true
-					break
-				}
-			}
-		}
-
-		if alreadyExists {
-			continue
-		}
-
-		c.processes[pid] = append(c.processes[pid], p)
-
-		// add to the process let it could be monitored
-		if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
-			err = multierror.Append(err, err1)
-		}
-
-		// add process ssl config
-		if err1 := addSSLProcess(int(pid), c.bpf, c.linker); err1 != nil {
-			err = multierror.Append(err, err1)
-		}
-
-		log.Debugf("add monitor process, pid: %d", pid)
-	}
-	return err
-}
-
-func (c *Context) DeleteProcesses(processes []api.ProcessInterface) (emptyProcesses bool, deleteError error) {
-	var err error
-	for _, p := range processes {
-		pid := p.Pid()
-		existsProcesses := make([]api.ProcessInterface, 0)
-		existsProcesses = append(existsProcesses, c.processes[pid]...)
-
-		// update process entities
-		newProcesses := make([]api.ProcessInterface, 0)
-
-		for _, existProcess := range existsProcesses {
-			if p.ID() != existProcess.ID() {
-				newProcesses = append(newProcesses, existProcess)
-			}
-		}
-
-		// no process need delete, then just ignore
-		if len(newProcesses) == len(existsProcesses) {
-			continue
-		}
-
-		// the process no need to monitor, then just ignore
-		if len(newProcesses) == 0 {
-			if err1 := c.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
-				err = multierror.Append(err, err1)
-			}
-			log.Debugf("delete monitor process: %d", pid)
-			delete(c.processes, pid)
-			continue
-		}
-		c.processes[pid] = newProcesses
-	}
-	return len(c.processes) == 0, err
-}
-
-func parseAddressV4(val uint32) string {
-	return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
-
-func parseAddressV6(val [16]uint8) string {
-	return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
diff --git a/pkg/profiling/task/network/metrics.go b/pkg/profiling/task/network/metrics.go
deleted file mode 100644
index a52d5e3..0000000
--- a/pkg/profiling/task/network/metrics.go
+++ /dev/null
@@ -1,396 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
-	"fmt"
-	"time"
-
-	"github.com/apache/skywalking-rover/pkg/process/api"
-	"github.com/apache/skywalking-rover/pkg/tools"
-
-	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
-)
-
-type SocketDataCounter struct {
-	Bytes   uint64
-	Count   uint64
-	ExeTime uint64
-}
-
-func NewSocketDataCounter() *SocketDataCounter {
-	return &SocketDataCounter{}
-}
-
-func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
-	s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
-}
-
-func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
-	s.Bytes += bytes
-	s.Count += count
-	s.ExeTime += exeTime
-}
-
-func (s *SocketDataCounter) NotEmpty() bool {
-	return s.Count > 0
-}
-
-// SocketDataCounterWithHistory means the socket send/receive data metrics
-type SocketDataCounterWithHistory struct {
-	Pre *SocketDataCounter
-	Cur *SocketDataCounter
-}
-
-func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
-	return &SocketDataCounterWithHistory{
-		Pre: NewSocketDataCounter(),
-		Cur: NewSocketDataCounter(),
-	}
-}
-
-func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime uint64) {
-	c.Pre = c.Cur
-	c.Cur = &SocketDataCounter{
-		Bytes:   bytes,
-		Count:   count,
-		ExeTime: exeTime,
-	}
-}
-
-func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
-	return &SocketDataCounter{
-		Bytes:   subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
-		Count:   subtractionValue(c.Cur.Count, c.Pre.Count),
-		ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
-	}
-}
-
-// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
-// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 50ms, 70ms, 100ms, 150ms,
-// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
-// value unit: ns
-var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 1000000, 1200000, 1500000, 1700000, 2000000,
-	2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 20000000, 25000000, 30000000, 35000000, 40000000,
-	45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 300000000, 500000000, 1000000000, 2000000000,
-	3000000000, 5000000000}
-
-// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value unit: us
-var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 1500, 1700, 2000,
-	2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 35000, 40000,
-	45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 2000000,
-	3000000, 5000000}
-var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
-
-type SocketDataHistogram struct {
-	Unit    HistogramDataUnit
-	Buckets map[uint64]uint32
-}
-
-func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
-	for k, v := range other.Buckets {
-		h.Buckets[k] = v
-	}
-}
-
-func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
-	h.Buckets[bucket] = value
-}
-
-func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
-	for k, v := range other.Buckets {
-		h.Buckets[k] += v
-	}
-}
-
-func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
-	floatVal := float64(val)
-	for inx, curVal := range SocketHistogramBucketsNs {
-		if inx > 0 && curVal > floatVal {
-			h.Buckets[uint64(inx-1)]++
-			return
-		}
-	}
-	h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
-}
-
-func (h *SocketDataHistogram) NotEmpty() bool {
-	for _, v := range h.Buckets {
-		if v > 0 {
-			return true
-		}
-	}
-	return false
-}
-
-func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
-	buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
-	for i := 0; i < SocketHistogramBucketsCount; i++ {
-		buckets[uint64(i)] = 0
-	}
-	return &SocketDataHistogram{
-		Unit:    unit,
-		Buckets: buckets,
-	}
-}
-
-type HistogramDataUnit int
-
-const (
-	HistogramDataUnitNS HistogramDataUnit = 1
-	HistogramDataUnitUS HistogramDataUnit = 2
-)
-
-type SocketDataHistogramWithHistory struct {
-	Pre *SocketDataHistogram
-	Cur *SocketDataHistogram
-}
-
-func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) *SocketDataHistogramWithHistory {
-	return &SocketDataHistogramWithHistory{
-		Pre: NewSocketDataHistogram(unit),
-		Cur: NewSocketDataHistogram(unit),
-	}
-}
-
-func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
-	// storage the current value to the previous buckets
-	h.Pre.Overwrite(h.Cur)
-}
-
-func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val uint32) {
-	h.Cur.Update(bucket, val)
-}
-
-func (h *SocketDataHistogramWithHistory) CalculateIncrease() *SocketDataHistogram {
-	histogram := NewSocketDataHistogram(h.Cur.Unit)
-	var increaseVal uint32
-	for curK, curV := range h.Cur.Buckets {
-		if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
-			histogram.Buckets[curK] = increaseVal
-		}
-	}
-	return histogram
-}
-
-func subtractionValue(v1, v2 uint64) uint64 {
-	if v1 > v2 {
-		return v1 - v2
-	}
-	return 0
-}
-
-type ProcessTraffic struct {
-	analyzer *TrafficAnalyzer
-
-	// local process information
-	LocalPid       uint32
-	LocalProcesses []api.ProcessInterface
-	LocalIP        string
-	LocalPort      uint16
-
-	// current connection role of local process
-	ConnectionRole ConnectionRole
-	// the protocol of the connection
-	Protocol ConnectionProtocol
-	// current connection is SSL
-	IsSSL bool
-
-	// remote process/address information
-	RemoteIP        string
-	RemotePort      uint16
-	RemotePid       uint32
-	RemoteProcesses []api.ProcessInterface
-
-	// statics
-	WriteCounter *SocketDataCounter
-	ReadCounter  *SocketDataCounter
-	// write RTT
-	WriteRTTCounter *SocketDataCounter
-
-	// connection operate
-	ConnectCounter *SocketDataCounter
-	CloseCounter   *SocketDataCounter
-
-	// exception operate
-	RetransmitCounter *SocketDataCounter
-	DropCounter       *SocketDataCounter
-
-	// histograms
-	// write execute time and RTT
-	WriteRTTHistogram     *SocketDataHistogram
-	WriteExeTimeHistogram *SocketDataHistogram
-	// read execute time
-	ReadExeTimeHistogram *SocketDataHistogram
-
-	// connection operate
-	ConnectExeTimeHistogram *SocketDataHistogram
-	CloseExeTimeHistogram   *SocketDataHistogram
-}
-
-func (r *ProcessTraffic) ContainsAnyTraffic() bool {
-	return r.WriteCounter.NotEmpty() || r.ReadCounter.NotEmpty() || r.WriteRTTCounter.NotEmpty() || r.ConnectCounter.NotEmpty() ||
-		r.CloseCounter.NotEmpty() || r.WriteRTTHistogram.NotEmpty() || r.WriteExeTimeHistogram.NotEmpty() || r.ReadExeTimeHistogram.NotEmpty() ||
-		r.ConnectExeTimeHistogram.NotEmpty() || r.CloseExeTimeHistogram.NotEmpty()
-}
-
-func (r *ProcessTraffic) GenerateMetrics(metricsPrefix string) []*v3.MeterDataCollection {
-	result := make([]*v3.MeterDataCollection, 0)
-	for _, p := range r.LocalProcesses {
-		collection := make([]*v3.MeterData, 0)
-		collection = r.appendCounterValues(collection, metricsPrefix, "write", p, r.WriteCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "read", p, r.ReadCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "write_rtt", p, r.WriteRTTCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "connect", p, r.ConnectCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "close", p, r.CloseCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "retransmit", p, r.RetransmitCounter)
-		collection = r.appendCounterValues(collection, metricsPrefix, "drop", p, r.DropCounter)
-
-		collection = r.appendHistogramValue(collection, metricsPrefix, "write_rtt", p, r.WriteRTTHistogram)
-		collection = r.appendHistogramValue(collection, metricsPrefix, "write_exe_time", p, r.WriteExeTimeHistogram)
-		collection = r.appendHistogramValue(collection, metricsPrefix, "read_exe_time", p, r.ReadExeTimeHistogram)
-		collection = r.appendHistogramValue(collection, metricsPrefix, "connect_exe_time", p, r.ConnectExeTimeHistogram)
-		collection = r.appendHistogramValue(collection, metricsPrefix, "close_exe_time", p, r.CloseExeTimeHistogram)
-
-		if len(collection) == 0 {
-			continue
-		}
-
-		// add entity
-		collection[0].Service = p.Entity().ServiceName
-		collection[0].ServiceInstance = p.Entity().InstanceName
-		collection[0].Timestamp = time.Now().UnixMilli()
-		result = append(result, &v3.MeterDataCollection{
-			MeterData: collection,
-		})
-	}
-
-	return result
-}
-
-func (r *ProcessTraffic) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, name string, local api.ProcessInterface,
-	counter *SocketDataCounter) []*v3.MeterData {
-	if !counter.NotEmpty() {
-		return metrics
-	}
-
-	count := float64(counter.Count)
-	metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_counts_counter", local, count))
-	if counter.Bytes > 0 {
-		metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_bytes_counter", local, float64(counter.Bytes)))
-	}
-	if counter.ExeTime > 0 {
-		metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_exe_time_counter", local, float64(counter.ExeTime)/count))
-	}
-	return metrics
-}
-
-func (r *ProcessTraffic) appendHistogramValue(metrics []*v3.MeterData, metricsPrefix, name string,
-	local api.ProcessInterface, histogram *SocketDataHistogram) []*v3.MeterData {
-	if !histogram.NotEmpty() {
-		return metrics
-	}
-
-	role, labels := r.buildBasicMeterLabels(local)
-	values := make([]*v3.MeterBucketValue, 0)
-	for bucket, count := range histogram.Buckets {
-		var bucketInx = int(bucket)
-		if bucketInx >= SocketHistogramBucketsCount {
-			bucketInx = SocketHistogramBucketsCount - 1
-		}
-		var buckets []float64
-		if histogram.Unit == HistogramDataUnitUS {
-			buckets = SocketHistogramBucketsUs
-		} else {
-			buckets = SocketHistogramBucketsNs
-		}
-		values = append(values, &v3.MeterBucketValue{
-			Bucket: buckets[bucketInx],
-			Count:  int64(count),
-		})
-	}
-
-	return append(metrics, &v3.MeterData{
-		Metric: &v3.MeterData_Histogram{
-			Histogram: &v3.MeterHistogram{
-				Name:   fmt.Sprintf("%s%s_%s_histogram", metricsPrefix, role.String(), name),
-				Labels: labels,
-				Values: values,
-			},
-		},
-	})
-}
-
-func (r *ProcessTraffic) buildSingleValue(prefix, name string, local api.ProcessInterface, val float64) *v3.MeterData {
-	role, labels := r.buildBasicMeterLabels(local)
-
-	return &v3.MeterData{
-		Metric: &v3.MeterData_SingleValue{
-			SingleValue: &v3.MeterSingleValue{
-				Name:   fmt.Sprintf("%s%s_%s", prefix, role.String(), name),
-				Labels: labels,
-				Value:  val,
-			},
-		},
-	}
-}
-
-func (r *ProcessTraffic) buildBasicMeterLabels(local api.ProcessInterface) (ConnectionRole, []*v3.Label) {
-	curRole := r.ConnectionRole
-	// add the default role
-	if curRole == ConnectionRoleUnknown {
-		curRole = ConnectionRoleClient
-	}
-	labels := make([]*v3.Label, 0)
-
-	// two pair process/address info
-	labels = r.appendMeterValue(labels, fmt.Sprintf("%s_process_id", curRole.String()), local.ID())
-	labels = r.appendRemoteAddrssInfo(labels, curRole.Revert().String(), local)
-
-	labels = r.appendMeterValue(labels, "side", curRole.String())
-
-	// protocol and ssl
-	labels = r.appendMeterValue(labels, "protocol", r.Protocol.String())
-	labels = r.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", r.IsSSL))
-	return curRole, labels
-}
-
-func (r *ProcessTraffic) appendRemoteAddrssInfo(labels []*v3.Label, prefix string, local api.ProcessInterface) []*v3.Label {
-	if len(r.RemoteProcesses) != 0 {
-		for _, p := range r.RemoteProcesses {
-			// only match with same service instance
-			if local.Entity().ServiceName == p.Entity().ServiceName &&
-				local.Entity().InstanceName == p.Entity().InstanceName {
-				return r.appendMeterValue(labels, prefix+"_process_id", p.ID())
-			}
-		}
-	}
-
-	if tools.IsLocalHostAddress(r.RemoteIP) || r.analyzer.IsLocalAddressInCache(r.RemoteIP) {
-		return r.appendMeterValue(labels, prefix+"_local", "true")
-	}
-
-	return r.appendMeterValue(labels, prefix+"_address", fmt.Sprintf("%s:%d", r.RemoteIP, r.RemotePort))
-}
-
-func (r *ProcessTraffic) appendMeterValue(labels []*v3.Label, name, value string) []*v3.Label {
-	return append(labels, &v3.Label{
-		Name:  name,
-		Value: value,
-	})
-}
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index c623bfd..b52aa3c 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -24,7 +24,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/sirupsen/logrus"
+	"github.com/cilium/ebpf"
 
 	"github.com/hashicorp/go-multierror"
 
@@ -35,16 +35,14 @@ import (
 	"github.com/apache/skywalking-rover/pkg/module"
 	"github.com/apache/skywalking-rover/pkg/process/api"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/base"
-	"github.com/apache/skywalking-rover/pkg/tools/btf"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze"
+	analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
 
 	v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
 )
 
-// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
-// nolint
-//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
-
-var log = logger.GetLogger("profiling", "task", "network", "topology")
+var log = logger.GetLogger("profiling", "task", "network")
 
 type Runner struct {
 	initOnce       sync.Once
@@ -54,18 +52,19 @@ type Runner struct {
 	reportInterval time.Duration
 	meterPrefix    string
 
-	bpf        *bpfObjects
-	linker     *Linker
-	bpfContext *Context
+	bpf            *bpf.Loader
+	processes      map[int32][]api.ProcessInterface
+	analyzeContext *analyzeBase.AnalyzerContext
 
 	ctx    context.Context
 	cancel context.CancelFunc
 }
 
 func NewGlobalRunnerContext() *Runner {
+	processes := make(map[int32][]api.ProcessInterface)
 	return &Runner{
-		bpfContext: NewContext(),
-		linker:     NewLinker(),
+		processes:      processes,
+		analyzeContext: analyze.NewContext(processes),
 	}
 }
 
@@ -78,7 +77,38 @@ func (r *Runner) init(config *base.TaskConfig, moduleMgr *module.Manager) error
 }
 
 func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, error) {
-	return r.bpfContext.DeleteProcesses(processes)
+	var err error
+	for _, p := range processes {
+		pid := p.Pid()
+		existsProcesses := make([]api.ProcessInterface, 0)
+		existsProcesses = append(existsProcesses, r.processes[pid]...)
+
+		// update process entities
+		newProcesses := make([]api.ProcessInterface, 0)
+
+		for _, existProcess := range existsProcesses {
+			if p.ID() != existProcess.ID() {
+				newProcesses = append(newProcesses, existProcess)
+			}
+		}
+
+		// no process need delete, then just ignore
+		if len(newProcesses) == len(existsProcesses) {
+			continue
+		}
+
+		// the process no need to monitor, then just ignore
+		if len(newProcesses) == 0 {
+			if err1 := r.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
+				err = multierror.Append(err, err1)
+			}
+			log.Debugf("delete monitor process: %d", pid)
+			delete(r.processes, pid)
+			continue
+		}
+		r.processes[pid] = newProcesses
+	}
+	return len(r.processes) == 0, err
 }
 
 func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) error {
@@ -86,59 +116,58 @@ func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) er
 	defer r.startLock.Unlock()
 	// if already start, then just adding the processes
 	if r.bpf != nil {
-		return r.bpfContext.AddProcesses(processes)
+		return r.addProcesses(processes)
 	}
 
 	r.ctx, r.cancel = context.WithCancel(ctx)
 	// load bpf program
-	objs := bpfObjects{}
-	if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil {
+	bpfLoader, err := bpf.NewLoader()
+	if err != nil {
 		return err
 	}
-	r.bpf = &objs
-	r.bpfContext.Init(&objs, r.linker)
+	r.bpf = bpfLoader
 
-	if err := r.bpfContext.AddProcesses(processes); err != nil {
+	if err := r.addProcesses(processes); err != nil {
 		return err
 	}
 
 	// register all handlers
-	r.bpfContext.RegisterAllHandlers()
-	r.bpfContext.StartSocketAddressParser(r.ctx)
+	r.analyzeContext.RegisterAllHandlers(bpfLoader)
+	r.analyzeContext.StartSocketAddressParser(r.ctx)
 
 	// sock opts
-	r.linker.AddSysCall("close", objs.SysClose, objs.SysCloseRet)
-	r.linker.AddSysCall("connect", objs.SysConnect, objs.SysConnectRet)
-	r.linker.AddSysCall("accept", objs.SysAccept, objs.SysAcceptRet)
-	r.linker.AddSysCall("accept4", objs.SysAccept, objs.SysAcceptRet)
-	r.linker.AddLink(link.Kretprobe, objs.SockAllocRet, "sock_alloc")
-	r.linker.AddLink(link.Kprobe, objs.TcpConnect, "tcp_connect")
+	bpfLoader.AddSysCall("close", bpfLoader.SysClose, bpfLoader.SysCloseRet)
+	bpfLoader.AddSysCall("connect", bpfLoader.SysConnect, bpfLoader.SysConnectRet)
+	bpfLoader.AddSysCall("accept", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
+	bpfLoader.AddSysCall("accept4", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
+	bpfLoader.AddLink(link.Kretprobe, bpfLoader.SockAllocRet, "sock_alloc")
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpConnect, "tcp_connect")
 
 	// write/receive data
-	r.linker.AddSysCall("send", objs.SysSend, objs.SysSendRet)
-	r.linker.AddSysCall("sendto", objs.SysSendto, objs.SysSendtoRet)
-	r.linker.AddSysCall("sendmsg", objs.SysSendmsg, objs.SysSendmsgRet)
-	r.linker.AddSysCall("sendmmsg", objs.SysSendmmsg, objs.SysSendmmsgRet)
-	r.linker.AddSysCall("sendfile", objs.SysSendfile, objs.SysSendfileRet)
-	r.linker.AddSysCall("sendfile64", objs.SysSendfile, objs.SysSendfileRet)
-	r.linker.AddSysCall("write", objs.SysWrite, objs.SysWriteRet)
-	r.linker.AddSysCall("writev", objs.SysWritev, objs.SysWritevRet)
-	r.linker.AddSysCall("read", objs.SysRead, objs.SysReadRet)
-	r.linker.AddSysCall("readv", objs.SysReadv, objs.SysReadvRet)
-	r.linker.AddSysCall("recv", objs.SysRecv, objs.SysRecvRet)
-	r.linker.AddSysCall("recvfrom", objs.SysRecvfrom, objs.SysRecvfromRet)
-	r.linker.AddSysCall("recvmsg", objs.SysRecvmsg, objs.SysRecvmsgRet)
-	r.linker.AddSysCall("recvmmsg", objs.SysRecvmmsg, objs.SysRecvmmsgRet)
-	r.linker.AddLink(link.Kprobe, objs.TcpRcvEstablished, "tcp_rcv_established")
-	r.linker.AddLink(link.Kprobe, objs.SecuritySocketSendmsg, "security_socket_sendmsg")
-	r.linker.AddLink(link.Kprobe, objs.SecuritySocketRecvmsg, "security_socket_recvmsg")
+	bpfLoader.AddSysCall("send", bpfLoader.SysSend, bpfLoader.SysSendRet)
+	bpfLoader.AddSysCall("sendto", bpfLoader.SysSendto, bpfLoader.SysSendtoRet)
+	bpfLoader.AddSysCall("sendmsg", bpfLoader.SysSendmsg, bpfLoader.SysSendmsgRet)
+	bpfLoader.AddSysCall("sendmmsg", bpfLoader.SysSendmmsg, bpfLoader.SysSendmmsgRet)
+	bpfLoader.AddSysCall("sendfile", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
+	bpfLoader.AddSysCall("sendfile64", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
+	bpfLoader.AddSysCall("write", bpfLoader.SysWrite, bpfLoader.SysWriteRet)
+	bpfLoader.AddSysCall("writev", bpfLoader.SysWritev, bpfLoader.SysWritevRet)
+	bpfLoader.AddSysCall("read", bpfLoader.SysRead, bpfLoader.SysReadRet)
+	bpfLoader.AddSysCall("readv", bpfLoader.SysReadv, bpfLoader.SysReadvRet)
+	bpfLoader.AddSysCall("recv", bpfLoader.SysRecv, bpfLoader.SysRecvRet)
+	bpfLoader.AddSysCall("recvfrom", bpfLoader.SysRecvfrom, bpfLoader.SysRecvfromRet)
+	bpfLoader.AddSysCall("recvmsg", bpfLoader.SysRecvmsg, bpfLoader.SysRecvmsgRet)
+	bpfLoader.AddSysCall("recvmmsg", bpfLoader.SysRecvmmsg, bpfLoader.SysRecvmmsgRet)
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRcvEstablished, "tcp_rcv_established")
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketSendmsg, "security_socket_sendmsg")
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketRecvmsg, "security_socket_recvmsg")
 
 	// retransmit/drop
-	r.linker.AddLink(link.Kprobe, objs.TcpRetransmit, "tcp_retransmit_skb")
-	r.linker.AddLink(link.Kprobe, objs.TcpDrop, "tcp_drop")
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRetransmit, "tcp_retransmit_skb")
+	bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpDrop, "tcp_drop")
 
-	if err := r.linker.HasError(); err != nil {
-		_ = r.linker.Close()
+	if err := bpfLoader.HasError(); err != nil {
+		_ = bpfLoader.Close()
 		return err
 	}
 
@@ -165,31 +194,12 @@ func (r *Runner) registerMetricsReport() {
 }
 
 func (r *Runner) flushMetrics() error {
-	// flush all connection from bpf
-	connections, err := r.bpfContext.FlushAllConnection()
+	// flush all metrics
+	metricsBuilder, err := r.analyzeContext.FlushAllMetrics(r.bpf, r.meterPrefix)
 	if err != nil {
 		return err
 	}
-	if len(connections) == 0 {
-		return nil
-	}
-
-	if log.Enable(logrus.DebugLevel) {
-		for _, con := range connections {
-			log.Debugf("found connection: %d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, read: %d bytes/%d, write: %d bytes/%d",
-				con.ConnectionID, con.Role.String(),
-				con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, con.RemotePort,
-				con.Protocol.String(), con.IsSSL, con.WriteCounter.Cur.Bytes, con.WriteCounter.Cur.Count,
-				con.ReadCounter.Cur.Bytes, con.ReadCounter.Cur.Count)
-		}
-	}
-	// combine all connection
-	analyzer := NewTrafficAnalyzer(r.bpfContext.processes)
-	traffics := analyzer.CombineConnectionToTraffics(connections)
-	if len(traffics) == 0 {
-		return nil
-	}
-	r.logTheMetricsConnections(traffics)
+	metrics := metricsBuilder.Build()
 
 	// send metrics
 	batch, err := r.meterClient.CollectBatch(r.ctx)
@@ -202,13 +212,10 @@ func (r *Runner) flushMetrics() error {
 		}
 	}()
 	count := 0
-	for _, traffic := range traffics {
-		collections := traffic.GenerateMetrics(r.meterPrefix)
-		for _, col := range collections {
-			count += len(col.MeterData)
-			if err := batch.Send(col); err != nil {
-				return err
-			}
+	for _, m := range metrics {
+		count += len(m.MeterData)
+		if err := batch.Send(m); err != nil {
+			return err
 		}
 	}
 	if count > 0 {
@@ -217,32 +224,6 @@ func (r *Runner) flushMetrics() error {
 	return nil
 }
 
-func (r *Runner) logTheMetricsConnections(traffices []*ProcessTraffic) {
-	if !log.Enable(logrus.DebugLevel) {
-		return
-	}
-	for _, traffic := range traffices {
-		localInfo := fmt.Sprintf("%s:%d(%d)", traffic.LocalIP, traffic.LocalPort, traffic.LocalPid)
-		if len(traffic.LocalProcesses) > 0 {
-			p := traffic.LocalProcesses[0]
-			localInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, p.Entity().ServiceName,
-				p.Entity().InstanceName, p.Entity().ProcessName, traffic.LocalIP, traffic.LocalPort, traffic.LocalPid)
-		}
-
-		remoteInfo := fmt.Sprintf("%s:%d(%d)", traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
-		if len(traffic.RemoteProcesses) > 0 {
-			p := traffic.RemoteProcesses[0]
-			remoteInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)",
-				p.Entity().Layer, p.Entity().ServiceName, p.Entity().InstanceName, p.Entity().ProcessName,
-				traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
-		}
-		side := traffic.ConnectionRole.String()
-		log.Debugf("connection analyze result: %s : %s -> %s, protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
-			side, localInfo, remoteInfo, traffic.Protocol.String(), traffic.IsSSL, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
-			traffic.ReadCounter.Bytes, traffic.ReadCounter.Count)
-	}
-}
-
 func (r *Runner) Stop() error {
 	// if starting, then need to wait start finished
 	r.startLock.Lock()
@@ -252,7 +233,6 @@ func (r *Runner) Stop() error {
 	}
 	var result error
 	r.stopOnce.Do(func() {
-		result = r.closeWhenExists(result, r.linker)
 		result = r.closeWhenExists(result, r.bpf)
 	})
 	return result
@@ -284,3 +264,38 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error
 	r.meterPrefix = config.Network.MeterPrefix + "_"
 	return nil
 }
+
+func (r *Runner) addProcesses(processes []api.ProcessInterface) error {
+	var err error
+	for _, p := range processes {
+		pid := p.Pid()
+		alreadyExists := false
+		if len(r.processes[pid]) > 0 {
+			for _, existsProcess := range r.processes[pid] {
+				if p.ID() == existsProcess.ID() {
+					alreadyExists = true
+					break
+				}
+			}
+		}
+
+		if alreadyExists {
+			continue
+		}
+
+		r.processes[pid] = append(r.processes[pid], p)
+
+		// add to the process let it could be monitored
+		if err1 := r.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
+			err = multierror.Append(err, err1)
+		}
+
+		// add process ssl config
+		if err1 := addSSLProcess(int(pid), r.bpf); err1 != nil {
+			err = multierror.Append(err, err1)
+		}
+
+		log.Debugf("add monitor process, pid: %d", pid)
+	}
+	return err
+}
diff --git a/pkg/profiling/task/network/ssl.go b/pkg/profiling/task/network/ssl.go
index 9adb7c8..c586c3a 100644
--- a/pkg/profiling/task/network/ssl.go
+++ b/pkg/profiling/task/network/ssl.go
@@ -26,6 +26,8 @@ import (
 	"strconv"
 	"strings"
 
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
 	"github.com/apache/skywalking-rover/pkg/tools"
 	"github.com/apache/skywalking-rover/pkg/tools/elf"
 	"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -52,36 +54,36 @@ type OpenSSLFdSymAddrConfigInBPF struct {
 	FDOffset       uint32
 }
 
-func addSSLProcess(pid int, bpf *bpfObjects, linker *Linker) error {
+func addSSLProcess(pid int, loader *bpf.Loader) error {
 	modules, err := tools.ProcessModules(int32(pid))
 	if err != nil {
 		return fmt.Errorf("read process modules error: %d, error: %v", pid, err)
 	}
 
 	// openssl process
-	if err1 := processOpenSSLProcess(pid, bpf, linker, modules); err1 != nil {
+	if err1 := processOpenSSLProcess(pid, loader, modules); err1 != nil {
 		return err1
 	}
 
 	// envoy with boring ssl
-	if err1 := processEnvoyProcess(pid, bpf, linker, modules); err1 != nil {
+	if err1 := processEnvoyProcess(pid, loader, modules); err1 != nil {
 		return err1
 	}
 
 	// GoTLS
-	if err1 := processGoProcess(pid, bpf, linker, modules); err1 != nil {
+	if err1 := processGoProcess(pid, loader, modules); err1 != nil {
 		return err1
 	}
 
 	// Nodejs
-	if err1 := processNodeProcess(pid, bpf, linker, modules); err1 != nil {
+	if err1 := processNodeProcess(pid, loader, modules); err1 != nil {
 		return err1
 	}
 
 	return nil
 }
 
-func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processOpenSSLProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
 	var libcryptoName, libsslName = "libcrypto.so", "libssl.so"
 	var libcryptoPath, libsslPath string
 	processModules, err := findProcessModules(modules, libcryptoName, libsslName)
@@ -107,22 +109,22 @@ func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*
 	if err != nil {
 		return err
 	}
-	if err := bpf.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil {
+	if err := loader.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil {
 		return err
 	}
 
 	// attach the linker
-	return processOpenSSLModule(bpf, processModules[libsslName], linker)
+	return processOpenSSLModule(loader, processModules[libsslName])
 }
 
-func processOpenSSLModule(bpf *bpfObjects, libSSLModule *profiling.Module, linker *Linker) error {
-	libSSLLinker := linker.OpenUProbeExeFile(libSSLModule.Path)
-	libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
-	libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
-	return linker.HasError()
+func processOpenSSLModule(loader *bpf.Loader, libSSLModule *profiling.Module) error {
+	libSSLLinker := loader.OpenUProbeExeFile(libSSLModule.Path)
+	libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, loader.OpensslWriteRet)
+	libSSLLinker.AddLink("SSL_read", loader.OpensslRead, loader.OpensslReadRet)
+	return loader.HasError()
 }
 
-func processEnvoyProcess(_ int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processEnvoyProcess(_ int, loader *bpf.Loader, modules []*profiling.Module) error {
 	moduleName := "/envoy"
 	processModules, err := findProcessModules(modules, moduleName)
 	if err != nil {
@@ -149,14 +151,14 @@ func processEnvoyProcess(_ int, bpf *bpfObjects, linker *Linker, modules []*prof
 	log.Debugf("found current module is envoy, so attach to the SSL read and write")
 
 	// attach the linker
-	libSSLLinker := linker.OpenUProbeExeFile(envoyModule.Path)
-	libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
-	libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
-	return linker.HasError()
+	libSSLLinker := loader.OpenUProbeExeFile(envoyModule.Path)
+	libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, loader.OpensslWriteRet)
+	libSSLLinker.AddLink("SSL_read", loader.OpensslRead, loader.OpensslReadRet)
+	return loader.HasError()
 }
 
 type SymbolLocation struct {
-	Type   GoTLSArgsLocationType
+	Type   base.GoTLSArgsLocationType
 	Offset uint32
 }
 
@@ -189,7 +191,7 @@ type GoStringInC struct {
 	Size uint64
 }
 
-func processGoProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processGoProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
 	// check current process is go program
 	buildVersionSymbol := searchSymbol(modules, func(a, b string) bool {
 		return a == b
@@ -220,20 +222,20 @@ func processGoProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profi
 	}
 
 	// setting the locations
-	if err := bpf.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); err != nil {
+	if err := loader.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); err != nil {
 		return fmt.Errorf("setting the Go TLS argument location failure, pid: %d, error: %v", pid, err)
 	}
 
 	// uprobes
-	exeFile := linker.OpenUProbeExeFile(pidExeFile)
-	exeFile.AddLinkWithType("runtime.casgstatus", true, bpf.GoCasgstatus)
-	exeFile.AddGoLink(goTLSWriteSymbol, bpf.GoTlsWrite, bpf.GoTlsWriteRet, elfFile)
-	exeFile.AddGoLink(goTLSReadSymbol, bpf.GoTlsRead, bpf.GoTlsReadRet, elfFile)
+	exeFile := loader.OpenUProbeExeFile(pidExeFile)
+	exeFile.AddLinkWithType("runtime.casgstatus", true, loader.GoCasgstatus)
+	exeFile.AddGoLink(goTLSWriteSymbol, loader.GoTlsWrite, loader.GoTlsWriteRet, elfFile)
+	exeFile.AddGoLink(goTLSReadSymbol, loader.GoTlsRead, loader.GoTlsReadRet, elfFile)
 
-	return linker.HasError()
+	return loader.HasError()
 }
 
-func processNodeProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processNodeProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
 	moduleName1, moduleName2, libsslName := "/nodejs", "/node", "libssl.so"
 	processModules, err := findProcessModules(modules, moduleName1, moduleName2, libsslName)
 	if err != nil {
@@ -271,16 +273,16 @@ func processNodeProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*pro
 		return err
 	}
 	// setting the locations
-	if err := bpf.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil {
+	if err := loader.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil {
 		return fmt.Errorf("setting the node TLS location failure, pid: %d, error: %v", pid, err)
 	}
 	// register node tls
-	if err := registerNodeTLSProbes(v, bpf, linker, nodeModule, libsslModule); err != nil {
+	if err := registerNodeTLSProbes(v, loader, nodeModule, libsslModule); err != nil {
 		return fmt.Errorf("register node TLS probes failure, pid: %d, error: %v", pid, err)
 	}
 	// attach the OpenSSL Probe if needs
 	if needsReAttachSSL {
-		return processOpenSSLModule(bpf, libsslModule, linker)
+		return processOpenSSLModule(loader, libsslModule)
 	}
 	return nil
 }
@@ -301,9 +303,9 @@ var nodeTLSAddrWithVersions = []struct {
 
 var nodeTLSProbeWithVersions = []struct {
 	v *version.Version
-	f func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module)
+	f func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
 }{
-	{version.Build(10, 19, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module) {
+	{version.Build(10, 19, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
 		uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrapC2E"),
 			bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
 		uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap7ClearInE"),
@@ -311,7 +313,7 @@ var nodeTLSProbeWithVersions = []struct {
 		uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap8ClearOutE"),
 			bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
 	}},
-	{version.Build(15, 0, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module) {
+	{version.Build(15, 0, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
 		uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrapC2E"),
 			bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
 		uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrap7ClearInE"),
@@ -344,8 +346,8 @@ func findNodeTLSAddrConfig(v *version.Version) (*NodeTLSAddrInBPF, error) {
 	return nil, fmt.Errorf("could not support version: %s", v)
 }
 
-func registerNodeTLSProbes(v *version.Version, bpf *bpfObjects, linker *Linker, nodeModule, libSSLModule *profiling.Module) error {
-	var probeFunc func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module)
+func registerNodeTLSProbes(v *version.Version, loader *bpf.Loader, nodeModule, libSSLModule *profiling.Module) error {
+	var probeFunc func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
 	for _, c := range nodeTLSProbeWithVersions {
 		if v.GreaterOrEquals(c.v) {
 			probeFunc = c.f
@@ -354,13 +356,13 @@ func registerNodeTLSProbes(v *version.Version, bpf *bpfObjects, linker *Linker,
 	if probeFunc == nil {
 		return fmt.Errorf("the version is not support: %v", v)
 	}
-	file := linker.OpenUProbeExeFile(nodeModule.Path)
-	probeFunc(file, bpf, nodeModule)
+	file := loader.OpenUProbeExeFile(nodeModule.Path)
+	probeFunc(file, loader, nodeModule)
 
 	// find the SSL_new, and register
-	file = linker.OpenUProbeExeFile(libSSLModule.Path)
-	file.AddLinkWithType("SSL_new", false, bpf.NodeTlsRetSsl)
-	return linker.HasError()
+	file = loader.OpenUProbeExeFile(libSSLModule.Path)
+	file.AddLinkWithType("SSL_new", false, loader.NodeTlsRetSsl)
+	return loader.HasError()
 }
 
 func getNodeVersion(p string) (*version.Version, error) {
@@ -493,10 +495,10 @@ func assignGoTLSArgsLocation(err error, function *elf.FunctionInfo, argName stri
 		return fmt.Errorf("the args is not found, function: %s, args name: %s", function.Name(), argName)
 	}
 	if args.Location.Type == elf.ArgLocationTypeStack {
-		dest.Type = GoTLSArgsLocationTypeStack
+		dest.Type = base.GoTLSArgsLocationTypeStack
 		dest.Offset = uint32(args.Location.Offset) + kSPOffset
 	} else if args.Location.Type == elf.ArgLocationTypeRegister {
-		dest.Type = GoTLSArgsLocationTypeRegister
+		dest.Type = base.GoTLSArgsLocationTypeRegister
 		dest.Offset = uint32(args.Location.Offset)
 	} else {
 		return fmt.Errorf("the location type is not support, function: %s, args name: %s, type: %d",