You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2022/10/13 08:25:44 UTC
[skywalking-rover] branch main updated: Refactor the network profiling analysis (#55)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new f1c8d49 Refactor the network profiling analysis (#55)
f1c8d49 is described below
commit f1c8d493486dff54043fe7a6cd4353593f2ba6bb
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Oct 13 16:25:39 2022 +0800
Refactor the network profiling analysis (#55)
---
pkg/profiling/task/network/analyze/analyze.go | 33 +
.../task/network/analyze/base/connection.go | 93 +++
pkg/profiling/task/network/analyze/base/context.go | 402 ++++++++++++
.../task/network/{ => analyze/base}/enums.go | 2 +-
pkg/profiling/task/network/analyze/base/events.go | 70 ++
.../task/network/analyze/base/listener.go | 51 ++
pkg/profiling/task/network/analyze/base/metrics.go | 98 +++
.../task/network/{ => analyze/base}/tcpresolver.go | 2 +-
.../{analyzer.go => analyze/base/traffic.go} | 106 +--
.../task/network/analyze/layer4/events.go | 88 +++
.../task/network/analyze/layer4/listener.go | 402 ++++++++++++
.../task/network/analyze/layer4/metrics.go | 276 ++++++++
pkg/profiling/task/network/bpf/bpf.go | 56 ++
pkg/profiling/task/network/{ => bpf}/linker.go | 5 +-
pkg/profiling/task/network/context.go | 726 ---------------------
pkg/profiling/task/network/metrics.go | 396 -----------
pkg/profiling/task/network/runner.go | 223 ++++---
pkg/profiling/task/network/ssl.go | 86 +--
18 files changed, 1799 insertions(+), 1316 deletions(-)
diff --git a/pkg/profiling/task/network/analyze/analyze.go b/pkg/profiling/task/network/analyze/analyze.go
new file mode 100644
index 0000000..28cda1e
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/analyze.go
@@ -0,0 +1,33 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package analyze
+
+import (
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer4"
+)
+
+// NewContext Wrap the analyzer builder
+func NewContext(monitorProcesses map[int32][]api.ProcessInterface) *base.AnalyzerContext {
+ context := base.NewAnalyzerContext(monitorProcesses)
+ // register all listeners
+ context.AddListener(layer4.NewListener())
+
+ return context
+}
diff --git a/pkg/profiling/task/network/analyze/base/connection.go b/pkg/profiling/task/network/analyze/base/connection.go
new file mode 100644
index 0000000..54f3940
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -0,0 +1,93 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import "github.com/apache/skywalking-rover/pkg/process/api"
+
+type ConnectionContext struct {
+ // basic metadata
+ ConnectionID uint64
+ RandomID uint64
+ LocalPid uint32
+ SocketFD uint32
+ LocalProcesses []api.ProcessInterface
+ ConnectionClosed bool
+ Protocol ConnectionProtocol
+ IsSSL bool
+
+ // socket metadata
+ Role ConnectionRole
+ LocalIP string
+ LocalPort uint16
+ RemoteIP string
+ RemotePort uint16
+
+ // metrics
+ Metrics *ConnectionMetrics
+
+ // Flush the data content to the oap count
+ FlushDataCount int
+}
+
+func (c *AnalyzerContext) NewConnectionContext(conID, randomID uint64, pid, fd uint32, processes []api.ProcessInterface,
+ conClosed bool) *ConnectionContext {
+ connection := &ConnectionContext{
+ // metadata
+ ConnectionID: conID,
+ RandomID: randomID,
+ LocalPid: pid,
+ SocketFD: fd,
+ LocalProcesses: processes,
+ ConnectionClosed: conClosed,
+
+ Metrics: c.NewConnectionMetrics(),
+ }
+ return connection
+}
+
+type ActiveConnectionInBPF struct {
+ RandomID uint64
+ Pid uint32
+ SocketFD uint32
+ Role ConnectionRole
+ SocketFamily uint32
+
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalAddrPort uint32
+
+ WriteBytes uint64
+ WriteCount uint64
+ WriteExeTime uint64
+ ReadBytes uint64
+ ReadCount uint64
+ ReadExeTime uint64
+
+ WriteRTTCount uint64
+ WriteRTTExeTime uint64
+
+ // Protocol analyze context
+ Protocol ConnectionProtocol
+ IsSSL uint32
+
+ // the connect event is already sent
+ ConnectEventIsSent uint32
+}
diff --git a/pkg/profiling/task/network/analyze/base/context.go b/pkg/profiling/task/network/analyze/base/context.go
new file mode 100644
index 0000000..fcda589
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/context.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net"
+ "sync"
+ "unsafe"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+
+ "github.com/cilium/ebpf"
+
+ "github.com/sirupsen/logrus"
+
+ cmap "github.com/orcaman/concurrent-map"
+
+ "golang.org/x/sys/unix"
+)
+
+type AnalyzerContext struct {
+ // listening process map
+ processes map[int32][]api.ProcessInterface
+
+ // connection handler
+ activeConnections cmap.ConcurrentMap // current activeConnections connections
+ closedConnections []*ConnectionContext // closed connections'
+ flushClosedEvents chan *SocketCloseEvent // connection have been closed, it is a queue to cache unknown active connections
+ sockParseQueue chan *ConnectionContext // socket address parse queue
+
+ // analyze listener list
+ listeners []AnalyzeListener
+
+ // close connection modify locker
+ closedConnectionLocker sync.RWMutex
+}
+
+func NewAnalyzerContext(processes map[int32][]api.ProcessInterface) *AnalyzerContext {
+ return &AnalyzerContext{
+ processes: processes,
+ activeConnections: cmap.New(),
+ closedConnections: make([]*ConnectionContext, 0),
+ flushClosedEvents: make(chan *SocketCloseEvent, 5000),
+ sockParseQueue: make(chan *ConnectionContext, 5000),
+ listeners: make([]AnalyzeListener, 0),
+ }
+}
+
+func (c *AnalyzerContext) AddListener(l AnalyzeListener) {
+ c.listeners = append(c.listeners, l)
+}
+
+func (c *AnalyzerContext) GetAllConnectionWithContext() []*ConnectionContext {
+ result := make([]*ConnectionContext, 0)
+ result = append(result, c.flushClosedConnection()...)
+ for _, con := range c.activeConnections.Items() {
+ result = append(result, con.(*ConnectionContext))
+ }
+ return result
+}
+
+func (c *AnalyzerContext) RegisterAllHandlers(bpfLoader *bpf.Loader) {
+ // socket connect
+ bpfLoader.ReadEventAsync(bpfLoader.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
+ return &SocketConnectEvent{}
+ })
+ // socket close
+ bpfLoader.ReadEventAsync(bpfLoader.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
+ return &SocketCloseEvent{}
+ })
+ for _, l := range c.listeners {
+ l.RegisterBPFEvents(bpfLoader)
+ }
+}
+
+func (c *AnalyzerContext) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+ for _, l := range c.listeners {
+ l.RegisterBPFEvents(bpfLoader)
+ }
+}
+
+func (c *AnalyzerContext) StartSocketAddressParser(ctx context.Context) {
+ for i := 0; i < 2; i++ {
+ go c.handleSocketParseQueue(ctx)
+ }
+}
+
+func (c *AnalyzerContext) handleSocketParseQueue(ctx context.Context) {
+ for {
+ select {
+ case cc := <-c.sockParseQueue:
+ socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
+ if err != nil {
+ // if the remote port of connection is empty, then this connection not available basically
+ if cc.RemotePort == 0 {
+ log.Warnf("complete the socket error, pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
+ }
+ continue
+ }
+ cc.LocalIP = socket.SrcIP
+ cc.LocalPort = socket.SrcPort
+ cc.RemoteIP = socket.DestIP
+ cc.RemotePort = socket.DestPort
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+func (c *AnalyzerContext) handleSocketConnectEvent(data interface{}) {
+ event := data.(*SocketConnectEvent)
+
+ if log.Enable(logrus.DebugLevel) {
+ marshal, _ := json.Marshal(event)
+ log.Debugf("found connect event, json: %s", string(marshal))
+ }
+
+ processes := c.processes[int32(event.Pid)]
+ if len(processes) == 0 {
+ log.Warnf("get process connect event, but this process is don't need to monitor, pid: %d", event.Pid)
+ return
+ }
+
+ // build active connection information
+ con := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.FD, processes, false)
+ con.Role = event.Role
+ if event.NeedComplete == 0 {
+ con.RemotePort = uint16(event.RemoteAddrPort)
+ con.LocalPort = uint16(event.LocalAddrPort)
+ if event.SocketFamily == unix.AF_INET {
+ con.LocalIP = parseAddressV4(event.LocalAddrV4)
+ con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+ } else {
+ con.LocalIP = parseAddressV6(event.LocalAddrV6)
+ con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+ }
+ } else {
+ // if the remote address exists then setting it
+ if event.RemoteAddrPort != 0 {
+ con.RemotePort = uint16(event.RemoteAddrPort)
+ if event.SocketFamily == unix.AF_INET {
+ con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+ } else {
+ con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+ }
+ }
+ c.sockParseQueue <- con
+ }
+
+ // notify the listeners
+ for _, l := range c.listeners {
+ l.ReceiveNewConnection(con, event)
+ }
+
+ // add to the context
+ c.saveActiveConnection(con)
+}
+
+func (c *AnalyzerContext) handleSocketCloseEvent(data interface{}) {
+ event := data.(*SocketCloseEvent)
+
+ if log.Enable(logrus.DebugLevel) {
+ marshal, _ := json.Marshal(event)
+ log.Debugf("found close event: %s", string(marshal))
+ }
+
+ // try to handle the socket close event
+ if !c.socketClosedEvent0(event) {
+ // is not in active connection, maybe it's not have been added to activate first
+ // just add to the close queue, wait for the flush connection with interval
+ c.flushClosedEvents <- event
+ return
+ }
+}
+
+func (c *AnalyzerContext) FlushAllMetrics(bpfLoader *bpf.Loader, metricsPrefix string) (*MetricsBuilder, error) {
+ metricsBuilder := NewMetricsBuilder(metricsPrefix)
+ err := c.flushMetrics0(bpfLoader, metricsBuilder)
+ if err != nil {
+ return nil, err
+ }
+ return metricsBuilder, nil
+}
+
+func (c *AnalyzerContext) flushMetrics0(bpfLoader *bpf.Loader, builder *MetricsBuilder) error {
+ // handling the unfinished close event
+ c.processCachedCloseEvents()
+
+ // get all connections
+ ccs := c.GetAllConnectionWithContext()
+ if len(ccs) == 0 {
+ return nil
+ }
+
+ // prepare to flush metrics
+ err := c.prepareToFlushMetrics(ccs, bpfLoader)
+ if err != nil {
+ return fmt.Errorf("prepare to flush the connection metrics failure: %v", err)
+ }
+
+ // combine all connections
+ analyzer := c.NewTrafficAnalyzer()
+ traffics := analyzer.CombineConnectionToTraffics(ccs)
+
+ // generate connections
+ for _, l := range c.listeners {
+ l.FlushMetrics(traffics, builder)
+ }
+
+ // after flush metrics
+ for _, l := range c.listeners {
+ l.PostFlushConnectionMetrics(ccs)
+ }
+ return nil
+}
+
+func (c *AnalyzerContext) prepareToFlushMetrics(ccs []*ConnectionContext, bpfLoader *bpf.Loader) error {
+ var active *ActiveConnectionInBPF
+ closedConnections := make([]string, 0)
+ connectionWithBPFList := make([]*ConnectionWithBPF, 0)
+
+ for _, cc := range ccs {
+ active, closedConnections = c.lookupTheActiveConnectionInBPf(cc, bpfLoader, closedConnections)
+ connectionWithBPFList = append(connectionWithBPFList, &ConnectionWithBPF{
+ Connection: cc,
+ ActiveInBPF: active,
+ })
+ }
+
+ // delete closed connections
+ if len(closedConnections) > 0 {
+ c.deleteConnectionOnly(closedConnections)
+ }
+
+ // call the listeners
+ for _, l := range c.listeners {
+ err := l.PreFlushConnectionMetrics(connectionWithBPFList, bpfLoader)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (c *AnalyzerContext) lookupTheActiveConnectionInBPf(connection *ConnectionContext, bpfLoader *bpf.Loader,
+ closedConnections []string) (active *ActiveConnectionInBPF, closedRef []string) {
+ var activeConnection ActiveConnectionInBPF
+ // if connection not closed, then load the basic stats from bpf map
+ if !connection.ConnectionClosed {
+ err := bpfLoader.ActiveConnectionMap.Lookup(connection.ConnectionID, &activeConnection)
+ if err != nil {
+ if errors.Is(err, ebpf.ErrKeyNotExist) {
+ closedConnections = append(closedConnections, c.generateConnectionKey(connection.ConnectionID, connection.RandomID))
+ connection.ConnectionClosed = true
+ } else {
+ log.Warnf("lookup the active connection error, connection id: %d, error: %v", connection.ConnectionID, err)
+ }
+ return nil, closedConnections
+ }
+
+ if log.Enable(logrus.DebugLevel) {
+ marshal, _ := json.Marshal(activeConnection)
+ log.Debugf("found the active connection, conid: %d, data: %s", connection.ConnectionID, string(marshal))
+ }
+
+ if connection.Role == ConnectionRoleUnknown && activeConnection.Role != ConnectionRoleUnknown {
+ connection.Role = activeConnection.Role
+ }
+ if connection.Protocol == ConnectionProtocolUnknown && activeConnection.Protocol != ConnectionProtocolUnknown {
+ connection.Protocol = activeConnection.Protocol
+ }
+ if !connection.IsSSL && activeConnection.IsSSL == 1 {
+ connection.IsSSL = true
+ }
+ return &activeConnection, closedConnections
+ }
+ return nil, closedConnections
+}
+
+func (c *AnalyzerContext) deleteConnectionOnly(ccs []string) {
+ for _, cc := range ccs {
+ c.activeConnections.Remove(cc)
+ }
+}
+
+func (c *AnalyzerContext) processCachedCloseEvents() {
+ for len(c.flushClosedEvents) > 0 {
+ event := <-c.flushClosedEvents
+ if !c.socketClosedEvent0(event) {
+ // if cannot the found the active connection, then just create a new closed connection context
+ processes := c.processes[int32(event.Pid)]
+ if len(processes) == 0 {
+ continue
+ }
+ cc := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.SocketFD, processes, true)
+ if event.SocketFamily == unix.AF_INET {
+ cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
+ cc.LocalIP = parseAddressV4(event.LocalAddrV4)
+ } else if event.SocketFamily == unix.AF_INET6 {
+ cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
+ cc.LocalIP = parseAddressV6(event.LocalAddrV6)
+ } else {
+ continue
+ }
+
+ // append to the closed connection
+ c.appendClosedConnection(c.combineClosedConnection(cc, event))
+ }
+ }
+}
+
+func (c *AnalyzerContext) generateConnectionKey(conID, randomID uint64) string {
+ return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (c *AnalyzerContext) socketClosedEvent0(event *SocketCloseEvent) bool {
+ activeCon := c.foundAndDeleteConnection(event)
+ if activeCon == nil {
+ return false
+ }
+
+ // combine the connection data
+ c.appendClosedConnection(c.combineClosedConnection(activeCon, event))
+ return true
+}
+
+func (c *AnalyzerContext) foundAndDeleteConnection(event *SocketCloseEvent) *ConnectionContext {
+ conKey := c.generateConnectionKey(event.ConID, event.RandomID)
+ val, exists := c.activeConnections.Pop(conKey)
+ if !exists {
+ return nil
+ }
+ return val.(*ConnectionContext)
+}
+
+func (c *AnalyzerContext) combineClosedConnection(active *ConnectionContext, closed *SocketCloseEvent) *ConnectionContext {
+ active.ConnectionClosed = true
+
+ if active.Role == ConnectionRoleUnknown && closed.Role != ConnectionRoleUnknown {
+ active.Role = closed.Role
+ }
+ if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != ConnectionProtocolUnknown {
+ active.Protocol = closed.Protocol
+ }
+ if !active.IsSSL && closed.IsSSL == 1 {
+ active.IsSSL = true
+ }
+
+ // notify listeners
+ for _, l := range c.listeners {
+ l.ReceiveCloseConnection(active, closed)
+ }
+ return active
+}
+
+func (c *AnalyzerContext) saveActiveConnection(con *ConnectionContext) {
+ c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, con.RandomID), con)
+}
+
+func (c *AnalyzerContext) flushClosedConnection() []*ConnectionContext {
+ c.closedConnectionLocker.Lock()
+ defer c.closedConnectionLocker.Unlock()
+
+ connections := c.closedConnections
+ c.closedConnections = make([]*ConnectionContext, 0)
+ return connections
+}
+
+func (c *AnalyzerContext) appendClosedConnection(con *ConnectionContext) {
+ c.closedConnectionLocker.RLock()
+ defer c.closedConnectionLocker.RUnlock()
+
+ c.closedConnections = append(c.closedConnections, con)
+}
+
+func parseAddressV4(val uint32) string {
+ return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+ return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/enums.go b/pkg/profiling/task/network/analyze/base/enums.go
similarity index 99%
rename from pkg/profiling/task/network/enums.go
rename to pkg/profiling/task/network/analyze/base/enums.go
index db43f06..d1f37e7 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/analyze/base/enums.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package network
+package base
const (
unknown = "unknown"
diff --git a/pkg/profiling/task/network/analyze/base/events.go b/pkg/profiling/task/network/analyze/base/events.go
new file mode 100644
index 0000000..6fbf4b4
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+// SocketConnectEvent Socket have been connection/accept event
+type SocketConnectEvent struct {
+ ConID uint64
+ RandomID uint64
+ ExeTime uint64
+ NeedComplete uint32
+ Pid uint32
+ FD uint32
+ FuncName uint32
+
+ // socket information if exists
+ Role ConnectionRole
+ SocketFamily uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalAddrPort uint32
+}
+
+type SocketCloseEvent struct {
+ ConID uint64
+ RandomID uint64
+ ExeTime uint64
+ Pid uint32
+ SocketFD uint32
+ Role ConnectionRole
+ Protocol ConnectionProtocol
+ IsSSL uint32
+ Fix uint32
+
+ SocketFamily uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalAddrPort uint32
+ Fix1 uint32
+
+ WriteBytes uint64
+ WriteCount uint64
+ WriteExeTime uint64
+ ReadBytes uint64
+ ReadCount uint64
+ ReadExeTime uint64
+
+ WriteRTTCount uint64
+ WriteRTTExeTime uint64
+}
diff --git a/pkg/profiling/task/network/analyze/base/listener.go b/pkg/profiling/task/network/analyze/base/listener.go
new file mode 100644
index 0000000..1e9e7d2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/listener.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+)
+
+type AnalyzeListener interface {
+ // Name of the listener
+ Name() string
+ // GenerateMetrics generate a metrics context
+ // It would bind to a ConnectionContext or ProcessTraffic automatically
+ GenerateMetrics() ListenerMetrics
+
+ // RegisterBPFEvents register the BPF events
+ RegisterBPFEvents(bpfLoader *bpf.Loader)
+
+ // ReceiveNewConnection call this method when receive a new connection event
+ // when return a metrics then It would bind to with the connection
+ ReceiveNewConnection(ctx *ConnectionContext, event *SocketConnectEvent)
+ // ReceiveCloseConnection call this method when receive the connection close event
+ ReceiveCloseConnection(ctx *ConnectionContext, event *SocketCloseEvent)
+
+ // PreFlushConnectionMetrics prepare to flush the connection metrics
+ PreFlushConnectionMetrics(ccs []*ConnectionWithBPF, bpfLoader *bpf.Loader) error
+ // FlushMetrics flush all metrics from connections
+ FlushMetrics(traffics []*ProcessTraffic, builder *MetricsBuilder)
+ // PostFlushConnectionMetrics after flushing all metrics, usually used to refresh the metrics
+ PostFlushConnectionMetrics(ccs []*ConnectionContext)
+}
+
+type ConnectionWithBPF struct {
+ Connection *ConnectionContext
+ ActiveInBPF *ActiveConnectionInBPF
+}
diff --git a/pkg/profiling/task/network/analyze/base/metrics.go b/pkg/profiling/task/network/analyze/base/metrics.go
new file mode 100644
index 0000000..2c1eddf
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/base/metrics.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "time"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
+
+// ListenerMetrics The Metrics in each listener
+type ListenerMetrics interface {
+ // FlushMetrics Flush the metrics of connection, and merge into self
+ FlushMetrics(connection *ConnectionContext)
+}
+
+type ConnectionMetrics struct {
+ data map[string]ListenerMetrics
+}
+
+func (c *AnalyzerContext) NewConnectionMetrics() *ConnectionMetrics {
+ data := make(map[string]ListenerMetrics)
+ for _, l := range c.listeners {
+ data[l.Name()] = l.GenerateMetrics()
+ }
+ return &ConnectionMetrics{data: data}
+}
+
+func (c *ConnectionMetrics) GetMetrics(listenerName string) ListenerMetrics {
+ return c.data[listenerName]
+}
+
+func (c *ConnectionMetrics) FlushMetrics(connection *ConnectionContext) {
+ for _, metric := range c.data {
+ metric.FlushMetrics(connection)
+ }
+}
+
+type MetricsBuilder struct {
+ prefix string
+ metrics map[metadata][]*v3.MeterData
+}
+
+func NewMetricsBuilder(prefix string) *MetricsBuilder {
+ return &MetricsBuilder{
+ prefix: prefix,
+ metrics: make(map[metadata][]*v3.MeterData),
+ }
+}
+
+func (m *MetricsBuilder) AppendMetrics(service, instance string, metrics []*v3.MeterData) {
+ meta := metadata{ServiceName: service, InstanceName: instance}
+ existingMetrics := m.metrics[meta]
+ if len(existingMetrics) == 0 {
+ m.metrics[meta] = metrics
+ return
+ }
+ m.metrics[meta] = append(existingMetrics, metrics...)
+}
+
+func (m *MetricsBuilder) MetricPrefix() string {
+ return m.prefix
+}
+
+func (m *MetricsBuilder) Build() []*v3.MeterDataCollection {
+ collections := make([]*v3.MeterDataCollection, 0)
+ now := time.Now().UnixMilli()
+ for meta, meters := range m.metrics {
+ if len(meters) == 0 {
+ continue
+ }
+ meters[0].Service = meta.ServiceName
+ meters[0].ServiceInstance = meta.InstanceName
+ meters[0].Timestamp = now
+ collections = append(collections, &v3.MeterDataCollection{MeterData: meters})
+ }
+ return collections
+}
+
+type metadata struct {
+ ServiceName string
+ InstanceName string
+}
diff --git a/pkg/profiling/task/network/tcpresolver.go b/pkg/profiling/task/network/analyze/base/tcpresolver.go
similarity index 99%
rename from pkg/profiling/task/network/tcpresolver.go
rename to pkg/profiling/task/network/analyze/base/tcpresolver.go
index 92d2326..dc27cd8 100644
--- a/pkg/profiling/task/network/tcpresolver.go
+++ b/pkg/profiling/task/network/analyze/base/tcpresolver.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package network
+package base
import (
"encoding/binary"
diff --git a/pkg/profiling/task/network/analyzer.go b/pkg/profiling/task/network/analyze/base/traffic.go
similarity index 86%
rename from pkg/profiling/task/network/analyzer.go
rename to pkg/profiling/task/network/analyze/base/traffic.go
index 8eb69bc..99a88a2 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyze/base/traffic.go
@@ -15,21 +15,50 @@
// specific language governing permissions and limitations
// under the License.
-package network
+package base
import (
+ "fmt"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/tools"
)
+var log = logger.GetLogger("profiling", "task", "network", "analyze")
+
const (
layerMeshDP = "MESH_DP"
layerMeshApp = "MESH"
processEnvoy = "envoy"
)
+type ProcessTraffic struct {
+ Analyzer *TrafficAnalyzer
+
+ // local process information
+ LocalIP string
+ LocalPort uint16
+ LocalPid uint32
+ LocalProcesses []api.ProcessInterface
+
+ // remote process/address information
+ RemoteIP string
+ RemotePort uint16
+ RemotePid uint32
+ RemoteProcesses []api.ProcessInterface
+
+ // connection basic information
+ Role ConnectionRole
+ Protocol ConnectionProtocol
+ IsSSL bool
+
+ // metrics
+ Metrics *ConnectionMetrics
+}
+
type TrafficAnalyzer struct {
- existingProcesses map[int32][]api.ProcessInterface
+ analyzeContext *AnalyzerContext
// used to find local same with remote address
// the connect request(local:a -> remote:b) same with accept address(remote:a -> local:b)
// key: localIP:port+RemoteIP+port
@@ -59,9 +88,9 @@ type TrafficAnalyzer struct {
localAddresses map[string]map[string]api.ProcessInterface
}
-func NewTrafficAnalyzer(processes map[int32][]api.ProcessInterface) *TrafficAnalyzer {
+func (c *AnalyzerContext) NewTrafficAnalyzer() *TrafficAnalyzer {
return &TrafficAnalyzer{
- existingProcesses: processes,
+ analyzeContext: c,
localWithPeerCache: make(map[LocalWithPeerAddress]*PidWithRole),
peerAddressCache: make(map[PeerAddress][]uint32),
envoyAcceptClientAddressCache: make(map[PeerAddress]*AddressWithPid),
@@ -125,19 +154,33 @@ func (t *TrafficAnalyzer) CombineConnectionToTraffics(connections []*ConnectionC
// combine all result
result := make([]*ProcessTraffic, 0)
for _, v := range pidMatchedTraffic {
- if v.ContainsAnyTraffic() {
- result = append(result, v)
- }
+ result = append(result, v)
}
for _, v := range pidToRemoteTraffic {
- if v.ContainsAnyTraffic() {
- result = append(result, v)
- }
+ result = append(result, v)
}
return result
}
+func (t *ProcessTraffic) GenerateConnectionInfo() string {
+ localInfo := fmt.Sprintf("%s:%d(%d)", t.LocalIP, t.LocalPort, t.LocalPid)
+ if len(t.LocalProcesses) > 0 {
+ localInfo = t.generateProcessInfo(t.LocalProcesses[0])
+ }
+
+ remoteInfo := fmt.Sprintf("%s:%d(%d)", t.RemoteIP, t.RemotePort, t.RemotePid)
+ if len(t.RemoteProcesses) > 0 {
+ remoteInfo = t.generateProcessInfo(t.RemoteProcesses[0])
+ }
+ return fmt.Sprintf("%s -> %s", localInfo, remoteInfo)
+}
+
+func (t *ProcessTraffic) generateProcessInfo(p api.ProcessInterface) string {
+ return fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, p.Entity().ServiceName,
+ p.Entity().InstanceName, p.Entity().ProcessName, t.LocalIP, t.LocalPort, t.LocalPid)
+}
+
func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con *ConnectionContext) {
if con.Role != ConnectionRoleUnknown {
return
@@ -163,34 +206,21 @@ func (t *TrafficAnalyzer) tryingToGenerateTheRoleWhenRemotePidCannotFound(con *C
func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con *ConnectionContext, remotePid uint32) *ProcessTraffic {
if traffic == nil {
traffic = &ProcessTraffic{
- analyzer: t,
+ Analyzer: t,
LocalPid: con.LocalPid,
LocalProcesses: con.LocalProcesses,
LocalIP: con.LocalIP,
LocalPort: con.LocalPort,
- ConnectionRole: con.Role,
-
- WriteCounter: NewSocketDataCounter(),
- ReadCounter: NewSocketDataCounter(),
- WriteRTTCounter: NewSocketDataCounter(),
- ConnectCounter: NewSocketDataCounter(),
- CloseCounter: NewSocketDataCounter(),
- RetransmitCounter: NewSocketDataCounter(),
- DropCounter: NewSocketDataCounter(),
- WriteRTTHistogram: NewSocketDataHistogram(HistogramDataUnitUS),
- WriteExeTimeHistogram: NewSocketDataHistogram(HistogramDataUnitNS),
- ReadExeTimeHistogram: NewSocketDataHistogram(HistogramDataUnitNS),
- ConnectExeTimeHistogram: NewSocketDataHistogram(HistogramDataUnitNS),
- CloseExeTimeHistogram: NewSocketDataHistogram(HistogramDataUnitNS),
+ Metrics: t.analyzeContext.NewConnectionMetrics(),
}
}
if len(traffic.LocalProcesses) == 0 && len(con.LocalProcesses) > 0 {
traffic.LocalProcesses = con.LocalProcesses
}
- if traffic.ConnectionRole == ConnectionRoleUnknown && con.Role != ConnectionRoleUnknown {
- traffic.ConnectionRole = con.Role
+ if traffic.Role == ConnectionRoleUnknown && con.Role != ConnectionRoleUnknown {
+ traffic.Role = con.Role
}
if traffic.Protocol == ConnectionProtocolUnknown && con.Protocol != ConnectionProtocolUnknown {
traffic.Protocol = con.Protocol
@@ -206,23 +236,9 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con
traffic.RemoteIP = con.RemoteIP
traffic.RemotePort = con.RemotePort
- traffic.WriteCounter.Increase(con.WriteCounter.CalculateIncrease())
- traffic.ReadCounter.Increase(con.ReadCounter.CalculateIncrease())
- traffic.WriteRTTCounter.Increase(con.WriteRTTCounter.CalculateIncrease())
- traffic.RetransmitCounter.Increase(con.RetransmitCounter)
- traffic.DropCounter.Increase(con.DropCounter)
- traffic.WriteRTTHistogram.Increase(con.WriteRTTHistogram.CalculateIncrease())
- traffic.WriteExeTimeHistogram.Increase(con.WriteExeTimeHistogram.CalculateIncrease())
- traffic.ReadExeTimeHistogram.Increase(con.ReadExeTimeHistogram.CalculateIncrease())
-
- if con.FlushDataCount == 0 && con.ConnectExecuteTime > 0 {
- traffic.ConnectCounter.IncreaseByValue(0, 1, con.ConnectExecuteTime)
- traffic.ConnectExeTimeHistogram.IncreaseByValue(con.ConnectExecuteTime)
- }
- if con.FlushDataCount == 0 && con.CloseExecuteTime > 0 {
- traffic.CloseCounter.IncreaseByValue(0, 1, con.CloseExecuteTime)
- traffic.CloseExeTimeHistogram.IncreaseByValue(con.CloseExecuteTime)
- }
+ // flush connection metrics
+ traffic.Metrics.FlushMetrics(con)
+
con.FlushDataCount++
return traffic
}
@@ -414,7 +430,7 @@ func (t *TrafficAnalyzer) findRemotePidWhenMeshEnvironment(con *ConnectionContex
}
func (t *TrafficAnalyzer) findSameInstanceMeshDP(entity *api.ProcessEntity) uint32 {
- for _, psList := range t.existingProcesses {
+ for _, psList := range t.analyzeContext.processes {
for _, p := range psList {
if p.Entity().Layer == layerMeshDP && p.Entity().ServiceName == entity.ServiceName && p.Entity().InstanceName == entity.InstanceName {
name, err := p.ExeName()
diff --git a/pkg/profiling/task/network/analyze/layer4/events.go b/pkg/profiling/task/network/analyze/layer4/events.go
new file mode 100644
index 0000000..641c2fb
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/events.go
@@ -0,0 +1,88 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+ "encoding/json"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+ "github.com/sirupsen/logrus"
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer4")
+
+// SocketExceptionOperationEvent Socket have been retransmitted/drop the package event
+type SocketExceptionOperationEvent struct {
+ Pid uint32
+ SocketFamily uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemoteAddrPort uint32
+ Type base.SocketExceptionOperationType
+}
+
+func (l *Listener) handleSocketExceptionOperationEvent(data interface{}) {
+ event := data.(*SocketExceptionOperationEvent)
+ l.socketExceptionOperationLock.Lock()
+ defer l.socketExceptionOperationLock.Unlock()
+
+ key := SocketBasicKey{
+ Pid: event.Pid,
+ Family: event.SocketFamily,
+ RemoteAddrV4: event.RemoteAddrV4,
+ RemoteAddrV6: event.RemoteAddrV6,
+ RemotePort: event.RemoteAddrPort,
+ }
+ exceptionValue := l.socketExceptionStatics[key]
+ if exceptionValue == nil {
+ exceptionValue = &SocketExceptionValue{}
+ l.socketExceptionStatics[key] = exceptionValue
+ }
+
+ switch event.Type {
+ case base.SocketExceptionOperationRetransmit:
+ exceptionValue.RetransmitCount++
+ case base.SocketExceptionOperationDrop:
+ exceptionValue.DropCount++
+ default:
+ log.Warnf("unknown socket exception operation type: %d", event.Type)
+ }
+
+ if log.Enable(logrus.DebugLevel) {
+ marshal, _ := json.Marshal(event)
+ log.Debugf("found socket exception operation event: %s", string(marshal))
+ }
+}
+
+type SocketBasicKey struct {
+ Pid uint32
+ Family uint32
+ RemoteAddrV4 uint32
+ RemoteAddrV6 [16]uint8
+ RemotePort uint32
+ LocalAddrV4 uint32
+ LocalAddrV6 [16]uint8
+ LocalPort uint32
+}
+
+type SocketExceptionValue struct {
+ DropCount int
+ RetransmitCount int
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go b/pkg/profiling/task/network/analyze/layer4/listener.go
new file mode 100644
index 0000000..88db514
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -0,0 +1,402 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import (
+ "fmt"
+ "net"
+ "sync"
+ "unsafe"
+
+ "github.com/sirupsen/logrus"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
+ "github.com/apache/skywalking-rover/pkg/tools"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+ "golang.org/x/sys/unix"
+)
+
+var Name = "layer4"
+
+type Listener struct {
+ // socket retransmit/drop
+ socketExceptionStatics map[SocketBasicKey]*SocketExceptionValue
+ socketExceptionOperationLock sync.Mutex
+}
+
+func NewListener() *Listener {
+ return &Listener{
+ socketExceptionStatics: make(map[SocketBasicKey]*SocketExceptionValue),
+ }
+}
+
+func (l *Listener) Name() string {
+ return Name
+}
+
+func (l *Listener) GenerateMetrics() base.ListenerMetrics {
+ return NewLayer4Metrics()
+}
+
+func (l *Listener) RegisterBPFEvents(bpfLoader *bpf.Loader) {
+ bpfLoader.ReadEventAsync(bpfLoader.SocketExceptionOperationEventQueue, l.handleSocketExceptionOperationEvent, func() interface{} {
+ return &SocketExceptionOperationEvent{}
+ })
+}
+
+func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event *base.SocketConnectEvent) {
+ // update the connection execute time
+ l.getMetrics(ctx.Metrics).ConnectExecuteTime = event.ExeTime
+}
+
+func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *base.SocketCloseEvent) {
+ layer4 := l.getMetrics(ctx.Metrics)
+ // data transmit counters
+ layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, event.WriteExeTime)
+ layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, event.ReadExeTime)
+ layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, event.WriteRTTExeTime)
+
+ // connection close execute time
+ layer4.CloseExecuteTime = event.ExeTime
+}
+
+func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
+ // rebuild to the map for helping quick search correlate ConnectionContext
+ keyWithContext := make(map[string]*base.ConnectionContext)
+ for _, cc := range ccs {
+ // ready to flush histograms
+ connection := cc.Connection
+ layer4 := l.getMetrics(connection.Metrics)
+ // basic counter update
+ activeConnection := cc.ActiveInBPF
+ if activeConnection != nil {
+ layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
+ layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
+ layer4.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
+ }
+ // build cache
+ keyWithContext[l.generateConID(connection.ConnectionID, connection.RandomID)] = connection
+
+ if log.Enable(logrus.DebugLevel) {
+ log.Debugf("found connection: %d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, is_closed: %t, write: %d bytes/%d, read: %d bytes/%d",
+ connection.ConnectionID, connection.Role.String(),
+ connection.LocalIP, connection.LocalPort, connection.LocalPid, connection.RemoteIP, connection.RemotePort,
+ connection.Protocol.String(), connection.IsSSL, connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
+ layer4.WriteCounter.Cur.Count, layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
+ }
+ }
+
+ var key HistogramDataKey
+ var count uint32
+ histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
+ // for-each the stats map
+ for histogramIt.Next(&key, &count) {
+ // if it's not relate to the ConnectionContext just ignore
+ cc := keyWithContext[l.generateConID(key.ConnectionID, key.RandomID)]
+ if cc == nil {
+ continue
+ }
+ layer4 := l.getMetrics(cc.Metrics)
+
+ // add the histogram data
+ var histogram *SocketDataHistogramWithHistory
+ if key.DataDirection == base.SocketDataDirectionEgress {
+ if key.DataType == base.SocketDataStaticsTypeExeTime {
+ histogram = layer4.WriteExeTimeHistogram
+ } else if key.DataType == base.SocketDataStaticsTypeRTT {
+ histogram = layer4.WriteRTTHistogram
+ }
+ } else if key.DataDirection == base.SocketDataDirectionIngress {
+ histogram = layer4.ReadExeTimeHistogram
+ }
+ if histogram == nil {
+ log.Warnf("unknown the histogram data: %v", cc)
+ continue
+ }
+ histogram.UpdateToCurrent(key.Bucket, count)
+
+ // delete the stats if the connection already closed
+ if cc.ConnectionClosed {
+ if err := bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
+ log.Warnf("delete the connection stats failure: %v", err)
+ }
+ }
+ }
+
+ // all the exception operations to the context
+ exceptionContexts := l.cleanAndGetAllExceptionContexts()
+ l.combineExceptionToConnections(keyWithContext, exceptionContexts)
+ return nil
+}
+
+func (l *Listener) PostFlushConnectionMetrics(ccs []*base.ConnectionContext) {
+ for _, connection := range ccs {
+ metrics := l.getMetrics(connection.Metrics)
+
+ // refresh counters
+ metrics.WriteCounter.RefreshCurrent()
+ metrics.ReadCounter.RefreshCurrent()
+ metrics.WriteRTTCounter.RefreshCurrent()
+ metrics.WriteRTTHistogram.RefreshCurrent()
+ metrics.WriteExeTimeHistogram.RefreshCurrent()
+ metrics.ReadExeTimeHistogram.RefreshCurrent()
+ metrics.ConnectCounter.RefreshCurrent()
+ metrics.CloseCounter.RefreshCurrent()
+ metrics.ConnectExeTimeHistogram.RefreshCurrent()
+ metrics.CloseExeTimeHistogram.RefreshCurrent()
+ metrics.RetransmitCounter.RefreshCurrent()
+ metrics.DropCounter.RefreshCurrent()
+ }
+}
+
+func (l *Listener) FlushMetrics(traffics []*base.ProcessTraffic, builder *base.MetricsBuilder) {
+ l.logTheMetricsConnections(traffics)
+
+ metricsPrefix := builder.MetricPrefix()
+ for _, traffic := range traffics {
+ metrics := traffic.Metrics.GetMetrics(Name).(*Metrics)
+ for _, p := range traffic.LocalProcesses {
+ collection := make([]*v3.MeterData, 0)
+ collection = l.appendCounterValues(collection, metricsPrefix, "write", traffic, p, metrics.WriteCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "read", traffic, p, metrics.ReadCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "connect", traffic, p, metrics.ConnectCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "close", traffic, p, metrics.CloseCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "retransmit", traffic, p, metrics.RetransmitCounter)
+ collection = l.appendCounterValues(collection, metricsPrefix, "drop", traffic, p, metrics.DropCounter)
+
+ collection = l.appendHistogramValue(collection, metricsPrefix, "write_rtt", traffic, p, metrics.WriteRTTHistogram)
+ collection = l.appendHistogramValue(collection, metricsPrefix, "write_exe_time", traffic, p, metrics.WriteExeTimeHistogram)
+ collection = l.appendHistogramValue(collection, metricsPrefix, "read_exe_time", traffic, p, metrics.ReadExeTimeHistogram)
+ collection = l.appendHistogramValue(collection, metricsPrefix, "connect_exe_time", traffic, p, metrics.ConnectExeTimeHistogram)
+ collection = l.appendHistogramValue(collection, metricsPrefix, "close_exe_time", traffic, p, metrics.CloseExeTimeHistogram)
+
+ if len(collection) == 0 {
+ continue
+ }
+
+ builder.AppendMetrics(p.Entity().ServiceName, p.Entity().InstanceName, collection)
+ }
+ }
+}
+
+func (l *Listener) logTheMetricsConnections(traffics []*base.ProcessTraffic) {
+ if !log.Enable(logrus.DebugLevel) {
+ return
+ }
+ for _, traffic := range traffics {
+ side := traffic.Role.String()
+ layer4 := l.getMetrics(traffic.Metrics)
+ log.Debugf("connection layer4 analyze result: %s : %s, protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
+ side, traffic.GenerateConnectionInfo(), traffic.Protocol.String(), traffic.IsSSL, layer4.WriteCounter.Cur.Bytes, layer4.WriteCounter.Cur.Count,
+ layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
+ }
+}
+
+func (l *Listener) generateConID(conID, randomID uint64) string {
+ return fmt.Sprintf("%d_%d", conID, randomID)
+}
+
+func (l *Listener) cleanAndGetAllExceptionContexts() map[SocketBasicKey]*SocketExceptionValue {
+ l.socketExceptionOperationLock.Lock()
+ defer l.socketExceptionOperationLock.Unlock()
+
+ result := l.socketExceptionStatics
+ l.socketExceptionStatics = make(map[SocketBasicKey]*SocketExceptionValue)
+ return result
+}
+
+func (l *Listener) combineExceptionToConnections(ccs map[string]*base.ConnectionContext, exps map[SocketBasicKey]*SocketExceptionValue) {
+ for key, value := range exps {
+ var remotePort, localPort = uint16(key.RemotePort), uint16(key.LocalPort)
+ var remoteIP, localIP string
+
+ if key.Family == unix.AF_INET {
+ remoteIP = parseAddressV4(key.RemoteAddrV4)
+ localIP = parseAddressV4(key.LocalAddrV4)
+ } else if key.Family == unix.AF_INET6 {
+ remoteIP = parseAddressV6(key.RemoteAddrV6)
+ localIP = parseAddressV6(key.LocalAddrV6)
+ } else {
+ continue
+ }
+
+ var firstRemoteMatch *base.ConnectionContext
+ var foundAllAddrMatch bool
+ for _, cc := range ccs {
+ // only add to the first matches
+ if cc.RemoteIP == remoteIP && cc.RemotePort == remotePort {
+ firstRemoteMatch = cc
+ if cc.LocalIP == localIP && cc.LocalPort == localPort {
+ l.mergeExceptionToAppointConnection(value, cc)
+ foundAllAddrMatch = true
+ break
+ }
+ }
+ }
+
+ // if only remote address match, then just add to the first one
+ if !foundAllAddrMatch && firstRemoteMatch != nil {
+ l.mergeExceptionToAppointConnection(value, firstRemoteMatch)
+ }
+ }
+}
+
+func (l *Listener) mergeExceptionToAppointConnection(expCtx *SocketExceptionValue, conCtx *base.ConnectionContext) {
+ layer4 := l.getMetrics(conCtx.Metrics)
+ layer4.DropCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.DropCount), 0))
+ layer4.RetransmitCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, uint64(expCtx.RetransmitCount), 0))
+}
+
+func (l *Listener) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
+ local api.ProcessInterface, counter *SocketDataCounterWithHistory) []*v3.MeterData {
+ metric := counter.Cur
+ if !metric.NotEmpty() {
+ return metrics
+ }
+
+ count := float64(metric.Count)
+ metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_counts_counter", traffic, local, count))
+ if metric.Bytes > 0 {
+ metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_bytes_counter", traffic, local, float64(metric.Bytes)))
+ }
+ if metric.ExeTime > 0 {
+ metrics = append(metrics, l.buildSingleValue(metricsPrefix, name+"_exe_time_counter", traffic, local, float64(metric.ExeTime)/count))
+ }
+ return metrics
+}
+
+func (l *Listener) appendHistogramValue(metrics []*v3.MeterData, metricsPrefix, name string, traffic *base.ProcessTraffic,
+ local api.ProcessInterface, histogram *SocketDataHistogramWithHistory) []*v3.MeterData {
+ data := histogram.Cur
+ if !data.NotEmpty() {
+ return metrics
+ }
+
+ role, labels := l.buildBasicMeterLabels(traffic, local)
+ values := make([]*v3.MeterBucketValue, 0)
+ for bucket, count := range data.Buckets {
+ var bucketInx = int(bucket)
+ if bucketInx >= SocketHistogramBucketsCount {
+ bucketInx = SocketHistogramBucketsCount - 1
+ }
+ var buckets []float64
+ if data.Unit == HistogramDataUnitUS {
+ buckets = SocketHistogramBucketsUs
+ } else {
+ buckets = SocketHistogramBucketsNs
+ }
+ values = append(values, &v3.MeterBucketValue{
+ Bucket: buckets[bucketInx],
+ Count: int64(count),
+ })
+ }
+
+ return append(metrics, &v3.MeterData{
+ Metric: &v3.MeterData_Histogram{
+ Histogram: &v3.MeterHistogram{
+ Name: fmt.Sprintf("%s%s_%s_histogram", metricsPrefix, role.String(), name),
+ Labels: labels,
+ Values: values,
+ },
+ },
+ })
+}
+
+func (l *Listener) buildSingleValue(prefix, name string, traffic *base.ProcessTraffic, local api.ProcessInterface, val float64) *v3.MeterData {
+ role, labels := l.buildBasicMeterLabels(traffic, local)
+
+ return &v3.MeterData{
+ Metric: &v3.MeterData_SingleValue{
+ SingleValue: &v3.MeterSingleValue{
+ Name: fmt.Sprintf("%s%s_%s", prefix, role.String(), name),
+ Labels: labels,
+ Value: val,
+ },
+ },
+ }
+}
+
+func (l *Listener) buildBasicMeterLabels(traffic *base.ProcessTraffic, local api.ProcessInterface) (base.ConnectionRole, []*v3.Label) {
+ curRole := traffic.Role
+ // add the default role
+ if curRole == base.ConnectionRoleUnknown {
+ curRole = base.ConnectionRoleClient
+ }
+ labels := make([]*v3.Label, 0)
+
+ // two pair process/address info
+ labels = l.appendMeterValue(labels, fmt.Sprintf("%s_process_id", curRole.String()), local.ID())
+ labels = l.appendRemoteAddressInfo(labels, traffic, curRole.Revert().String(), local)
+
+ labels = l.appendMeterValue(labels, "side", curRole.String())
+
+ // protocol and ssl
+ labels = l.appendMeterValue(labels, "protocol", traffic.Protocol.String())
+ labels = l.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", traffic.IsSSL))
+ return curRole, labels
+}
+
+func (l *Listener) appendRemoteAddressInfo(labels []*v3.Label, traffic *base.ProcessTraffic, prefix string, local api.ProcessInterface) []*v3.Label {
+ if len(traffic.RemoteProcesses) != 0 {
+ for _, p := range traffic.RemoteProcesses {
+ // only match with same service instance
+ if local.Entity().ServiceName == p.Entity().ServiceName &&
+ local.Entity().InstanceName == p.Entity().InstanceName {
+ return l.appendMeterValue(labels, prefix+"_process_id", p.ID())
+ }
+ }
+ }
+
+ if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
+ return l.appendMeterValue(labels, prefix+"_local", "true")
+ }
+
+ return l.appendMeterValue(labels, prefix+"_address", fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort))
+}
+
+func (l *Listener) appendMeterValue(labels []*v3.Label, name, value string) []*v3.Label {
+ return append(labels, &v3.Label{
+ Name: name,
+ Value: value,
+ })
+}
+
+func (l *Listener) getMetrics(connectionMetrics *base.ConnectionMetrics) *Metrics {
+ return connectionMetrics.GetMetrics(Name).(*Metrics)
+}
+
+type HistogramDataKey struct {
+ ConnectionID uint64
+ RandomID uint64
+ DataDirection base.SocketDataDirection
+ DataType base.SocketDataStaticsType
+ Bucket uint64
+}
+
+func parseAddressV4(val uint32) string {
+ return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
+
+func parseAddressV6(val [16]uint8) string {
+ return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
+}
diff --git a/pkg/profiling/task/network/analyze/layer4/metrics.go b/pkg/profiling/task/network/analyze/layer4/metrics.go
new file mode 100644
index 0000000..f6a6658
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer4/metrics.go
@@ -0,0 +1,276 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package layer4
+
+import "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+
+type Metrics struct {
+ // basic statics
+ // read/write
+ WriteCounter *SocketDataCounterWithHistory
+ ReadCounter *SocketDataCounterWithHistory
+ // write RTT
+ WriteRTTCounter *SocketDataCounterWithHistory
+
+ // histograms
+ // write execute time and RTT
+ WriteRTTHistogram *SocketDataHistogramWithHistory
+ WriteExeTimeHistogram *SocketDataHistogramWithHistory
+ // read execute time
+ ReadExeTimeHistogram *SocketDataHistogramWithHistory
+
+ // the connection connect or close execute time
+ ConnectExecuteTime uint64
+ CloseExecuteTime uint64
+ ConnectCounter *SocketDataCounterWithHistory
+ CloseCounter *SocketDataCounterWithHistory
+ ConnectExeTimeHistogram *SocketDataHistogramWithHistory
+ CloseExeTimeHistogram *SocketDataHistogramWithHistory
+
+ // exception counters
+ RetransmitCounter *SocketDataCounterWithHistory
+ DropCounter *SocketDataCounterWithHistory
+}
+
+func NewLayer4Metrics() *Metrics {
+ return &Metrics{
+ WriteCounter: NewSocketDataCounterWithHistory(),
+ ReadCounter: NewSocketDataCounterWithHistory(),
+ WriteRTTCounter: NewSocketDataCounterWithHistory(),
+ WriteRTTHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
+ WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+ ReadExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+ ConnectCounter: NewSocketDataCounterWithHistory(),
+ ConnectExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+ CloseCounter: NewSocketDataCounterWithHistory(),
+ CloseExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
+ RetransmitCounter: NewSocketDataCounterWithHistory(),
+ DropCounter: NewSocketDataCounterWithHistory(),
+ }
+}
+
+func (l *Metrics) FlushMetrics(connection *base.ConnectionContext) {
+ metrics := connection.Metrics.GetMetrics(Name).(*Metrics)
+
+ l.WriteCounter.IncreaseToCurrent(metrics.WriteCounter.CalculateIncrease())
+ l.ReadCounter.IncreaseToCurrent(metrics.ReadCounter.CalculateIncrease())
+ l.WriteRTTCounter.IncreaseToCurrent(metrics.WriteRTTCounter.CalculateIncrease())
+
+ l.WriteRTTHistogram.IncreaseToCurrent(metrics.WriteRTTHistogram.CalculateIncrease())
+ l.WriteExeTimeHistogram.IncreaseToCurrent(metrics.WriteExeTimeHistogram.CalculateIncrease())
+ l.ReadExeTimeHistogram.IncreaseToCurrent(metrics.ReadExeTimeHistogram.CalculateIncrease())
+
+ l.RetransmitCounter.IncreaseToCurrent(metrics.RetransmitCounter.CalculateIncrease())
+ l.DropCounter.IncreaseToCurrent(metrics.DropCounter.CalculateIncrease())
+
+ if connection.FlushDataCount == 0 && metrics.ConnectExecuteTime > 0 {
+ l.ConnectCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, metrics.ConnectExecuteTime))
+ l.ConnectExeTimeHistogram.Cur.IncreaseByValue(metrics.ConnectExecuteTime)
+ }
+ if connection.FlushDataCount == 0 && metrics.CloseExecuteTime > 0 {
+ l.CloseCounter.IncreaseToCurrent(NewSocketDataCounterWithValue(0, 1, metrics.CloseExecuteTime))
+ l.CloseExeTimeHistogram.Cur.IncreaseByValue(metrics.CloseExecuteTime)
+ }
+}
+
+type SocketDataCounter struct {
+ Bytes uint64
+ Count uint64
+ ExeTime uint64
+}
+
+func NewSocketDataCounter() *SocketDataCounter {
+ return &SocketDataCounter{}
+}
+
+func NewSocketDataCounterWithValue(bytes, count, exeTime uint64) *SocketDataCounter {
+ ret := &SocketDataCounter{}
+ ret.IncreaseByValue(bytes, count, exeTime)
+ return ret
+}
+
+func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
+ s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
+}
+
+func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
+ s.Bytes += bytes
+ s.Count += count
+ s.ExeTime += exeTime
+}
+
+func (s *SocketDataCounter) NotEmpty() bool {
+ return s.Count > 0
+}
+
+// SocketDataCounterWithHistory means the socket send/receive data metrics
+type SocketDataCounterWithHistory struct {
+ Pre *SocketDataCounter
+ Cur *SocketDataCounter
+}
+
+func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
+ return &SocketDataCounterWithHistory{
+ Pre: NewSocketDataCounter(),
+ Cur: NewSocketDataCounter(),
+ }
+}
+
+func (c *SocketDataCounterWithHistory) RefreshCurrent() {
+ c.Pre = c.Cur
+ c.Cur = NewSocketDataCounterWithValue(c.Cur.Bytes, c.Cur.Count, c.Cur.ExeTime)
+}
+
+func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime uint64) {
+ c.Pre = c.Cur
+ c.Cur = &SocketDataCounter{
+ Bytes: bytes,
+ Count: count,
+ ExeTime: exeTime,
+ }
+}
+
+func (c *SocketDataCounterWithHistory) IncreaseToCurrent(other *SocketDataCounter) {
+ c.Cur.Increase(other)
+}
+
+func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
+ return &SocketDataCounter{
+ Bytes: subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
+ Count: subtractionValue(c.Cur.Count, c.Pre.Count),
+ ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
+ }
+}
+
+// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
+// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 50ms, 70ms, 100ms, 150ms,
+// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
+// value unit: ns
+var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 1000000, 1200000, 1500000, 1700000, 2000000,
+ 2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 20000000, 25000000, 30000000, 35000000, 40000000,
+ 45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 300000000, 500000000, 1000000000, 2000000000,
+ 3000000000, 5000000000}
+
+// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value unit: us
+var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 1500, 1700, 2000,
+ 2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 35000, 40000,
+ 45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 2000000,
+ 3000000, 5000000}
+var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
+
+type SocketDataHistogram struct {
+ Unit HistogramDataUnit
+ Buckets map[uint64]uint32
+}
+
+func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
+ for k, v := range other.Buckets {
+ h.Buckets[k] = v
+ }
+}
+
+func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
+ h.Buckets[bucket] = value
+}
+
+func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
+ for k, v := range other.Buckets {
+ h.Buckets[k] += v
+ }
+}
+
+func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
+ floatVal := float64(val)
+ for inx, curVal := range SocketHistogramBucketsNs {
+ if inx > 0 && curVal > floatVal {
+ h.Buckets[uint64(inx-1)]++
+ return
+ }
+ }
+ h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
+}
+
+func (h *SocketDataHistogram) NotEmpty() bool {
+ for _, v := range h.Buckets {
+ if v > 0 {
+ return true
+ }
+ }
+ return false
+}
+
+func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
+ buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
+ for i := 0; i < SocketHistogramBucketsCount; i++ {
+ buckets[uint64(i)] = 0
+ }
+ return &SocketDataHistogram{
+ Unit: unit,
+ Buckets: buckets,
+ }
+}
+
+type HistogramDataUnit int
+
+const (
+ HistogramDataUnitNS HistogramDataUnit = 1
+ HistogramDataUnitUS HistogramDataUnit = 2
+)
+
+type SocketDataHistogramWithHistory struct {
+ Pre *SocketDataHistogram
+ Cur *SocketDataHistogram
+}
+
+func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) *SocketDataHistogramWithHistory {
+ return &SocketDataHistogramWithHistory{
+ Pre: NewSocketDataHistogram(unit),
+ Cur: NewSocketDataHistogram(unit),
+ }
+}
+
+func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
+ // storage the current value to the previous buckets
+ h.Pre.Overwrite(h.Cur)
+}
+
+func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val uint32) {
+ h.Cur.Update(bucket, val)
+}
+
+func (h *SocketDataHistogramWithHistory) IncreaseToCurrent(other *SocketDataHistogram) {
+ h.Cur.Increase(other)
+}
+
+func (h *SocketDataHistogramWithHistory) CalculateIncrease() *SocketDataHistogram {
+ histogram := NewSocketDataHistogram(h.Cur.Unit)
+ var increaseVal uint32
+ for curK, curV := range h.Cur.Buckets {
+ if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
+ histogram.Buckets[curK] = increaseVal
+ }
+ }
+ return histogram
+}
+
+func subtractionValue(v1, v2 uint64) uint64 {
+ if v1 > v2 {
+ return v1 - v2
+ }
+ return 0
+}
diff --git a/pkg/profiling/task/network/bpf/bpf.go b/pkg/profiling/task/network/bpf/bpf.go
new file mode 100644
index 0000000..2181f09
--- /dev/null
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -0,0 +1,56 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package bpf
+
+import (
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
+
+ "github.com/hashicorp/go-multierror"
+)
+
+// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
+// nolint
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
+
+type Loader struct {
+ *Linker
+ *bpfObjects
+}
+
+func NewLoader() (*Loader, error) {
+ objs := bpfObjects{}
+ if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil {
+ return nil, err
+ }
+
+ return &Loader{
+ bpfObjects: &objs,
+ Linker: NewLinker(),
+ }, nil
+}
+
+func (l *Loader) Close() error {
+ var err error
+ if e := l.bpfObjects.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ if e := l.Linker.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ return err
+}
diff --git a/pkg/profiling/task/network/linker.go b/pkg/profiling/task/network/bpf/linker.go
similarity index 98%
rename from pkg/profiling/task/network/linker.go
rename to pkg/profiling/task/network/bpf/linker.go
index c8cdf1e..7664918 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/bpf/linker.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package network
+package bpf
import (
"bytes"
@@ -28,6 +28,7 @@ import (
"golang.org/x/arch/x86/x86asm"
+ "github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/elf"
@@ -38,6 +39,8 @@ import (
"github.com/hashicorp/go-multierror"
)
+var log = logger.GetLogger("profiling", "task", "network", "bpf")
+
const defaultSymbolPrefix = "sys_"
type LinkFunc func(symbol string, prog *ebpf.Program) (link.Link, error)
diff --git a/pkg/profiling/task/network/context.go b/pkg/profiling/task/network/context.go
deleted file mode 100644
index 3ebc2eb..0000000
--- a/pkg/profiling/task/network/context.go
+++ /dev/null
@@ -1,726 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "sync"
- "unsafe"
-
- cmap "github.com/orcaman/concurrent-map"
-
- "github.com/sirupsen/logrus"
-
- "github.com/hashicorp/go-multierror"
-
- "github.com/cilium/ebpf"
-
- "golang.org/x/sys/unix"
-
- "github.com/apache/skywalking-rover/pkg/process/api"
-)
-
-type Context struct {
- processes map[int32][]api.ProcessInterface
-
- bpf *bpfObjects // current bpf programs
- linker *Linker
-
- // standard syscall connections
- activeConnections cmap.ConcurrentMap // current activeConnections connections
- closedConnections []*ConnectionContext // closed connections'
- flushClosedEvents chan *SocketCloseEvent // connection have been closed, it is a queue to cache unknown active connections
- sockParseQueue chan *ConnectionContext // socket address parse queue
-
- // socket retransmit/drop
- socketExceptionStatics map[SocketBasicKey]*SocketExceptionValue
- socketExceptionOperationLock sync.Mutex
-}
-
-type SocketBasicKey struct {
- Pid uint32
- Family uint32
- RemoteAddrV4 uint32
- RemoteAddrV6 [16]uint8
- RemotePort uint32
- LocalAddrV4 uint32
- LocalAddrV6 [16]uint8
- LocalPort uint32
-}
-
-type SocketExceptionValue struct {
- DropCount int
- RetransmitCount int
-}
-
-type ConnectionContext struct {
- // basic metadata
- ConnectionID uint64
- RandomID uint64
- LocalPid uint32
- SocketFD uint32
- LocalProcesses []api.ProcessInterface
- ConnectionClosed bool
- Protocol ConnectionProtocol
- IsSSL bool
-
- // socket metadata
- Role ConnectionRole
- LocalIP string
- LocalPort uint16
- RemoteIP string
- RemotePort uint16
-
- // basic statics
- // read/write
- WriteCounter *SocketDataCounterWithHistory
- ReadCounter *SocketDataCounterWithHistory
- // write RTT
- WriteRTTCounter *SocketDataCounterWithHistory
-
- // histograms
- // write execute time and RTT
- WriteRTTHistogram *SocketDataHistogramWithHistory
- WriteExeTimeHistogram *SocketDataHistogramWithHistory
- // read execute time
- ReadExeTimeHistogram *SocketDataHistogramWithHistory
-
- // the connection connect or close execute time
- ConnectExecuteTime uint64
- CloseExecuteTime uint64
-
- // exception counters
- RetransmitCounter *SocketDataCounter
- DropCounter *SocketDataCounter
-
- // Flush the data content to the oap count
- FlushDataCount int
-}
-
-func NewContext() *Context {
- return &Context{
- activeConnections: cmap.New(),
- closedConnections: make([]*ConnectionContext, 0),
- flushClosedEvents: make(chan *SocketCloseEvent, 5000),
- sockParseQueue: make(chan *ConnectionContext, 5000),
- processes: make(map[int32][]api.ProcessInterface),
- socketExceptionStatics: make(map[SocketBasicKey]*SocketExceptionValue),
- }
-}
-
-func (c *Context) Init(bpf *bpfObjects, linker *Linker) {
- c.bpf = bpf
- c.linker = linker
-}
-
-func (c *Context) RegisterAllHandlers() {
- // socket connect
- c.linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
- return &SocketConnectEvent{}
- })
- // socket close
- c.linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
- return &SocketCloseEvent{}
- })
- // socket retransmit
- c.linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, c.handleSocketExceptionOperationEvent, func() interface{} {
- return &SocketExceptionOperationEvent{}
- })
-}
-
-func (c *Context) FlushAllConnection() ([]*ConnectionContext, error) {
- // handling the unfinished close event
- c.batchReProcessCachedCloseEvent()
-
- // get all connection context and fill the metrics
- allContexts := c.getAllConnectionWithContext()
- c.fillConnectionMetrics(allContexts)
-
- // all the exception operations to the context
- exceptionContexts := c.cleanAndGetAllExceptionContexts()
- // init all exception counters
- for _, ctx := range allContexts {
- ctx.DropCounter = NewSocketDataCounter()
- ctx.RetransmitCounter = NewSocketDataCounter()
- }
- c.combineExceptionToConnections(allContexts, exceptionContexts)
-
- return allContexts, nil
-}
-
-func (c *Context) StartSocketAddressParser(ctx context.Context) {
- for i := 0; i < 2; i++ {
- go c.handleSocketParseQueue(ctx)
- }
-}
-
-func (c *Context) handleSocketParseQueue(ctx context.Context) {
- for {
- select {
- case cc := <-c.sockParseQueue:
- socket, err := ParseSocket(cc.LocalPid, cc.SocketFD)
- if err != nil {
- // if the remote port of connection is empty, then this connection not available basically
- if cc.RemotePort == 0 {
- log.Warnf("complete the socket error, pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err)
- }
- continue
- }
- cc.LocalIP = socket.SrcIP
- cc.LocalPort = socket.SrcPort
- cc.RemoteIP = socket.DestIP
- cc.RemotePort = socket.DestPort
- case <-ctx.Done():
- return
- }
- }
-}
-
-func (c *Context) combineExceptionToConnections(ccs []*ConnectionContext, exps map[SocketBasicKey]*SocketExceptionValue) {
- for key, value := range exps {
- var remotePort, localPort = uint16(key.RemotePort), uint16(key.LocalPort)
- var remoteIP, localIP string
-
- if key.Family == unix.AF_INET {
- remoteIP = parseAddressV4(key.RemoteAddrV4)
- localIP = parseAddressV4(key.LocalAddrV4)
- } else if key.Family == unix.AF_INET6 {
- remoteIP = parseAddressV6(key.RemoteAddrV6)
- localIP = parseAddressV6(key.LocalAddrV6)
- } else {
- continue
- }
-
- var firstRemoteMatch *ConnectionContext
- var foundAllAddrMatch bool
- for _, cc := range ccs {
- // only add to the first matches
- if cc.RemoteIP == remoteIP && cc.RemotePort == remotePort {
- firstRemoteMatch = cc
- if cc.LocalIP == localIP && cc.LocalPort == localPort {
- c.mergeExceptionToAppointConnection(value, cc)
- foundAllAddrMatch = true
- break
- }
- }
- }
-
- // if only remote address match, then just add to the first one
- if !foundAllAddrMatch && firstRemoteMatch != nil {
- c.mergeExceptionToAppointConnection(value, firstRemoteMatch)
- }
- }
-}
-
-func (c *Context) mergeExceptionToAppointConnection(expCtx *SocketExceptionValue, conCtx *ConnectionContext) {
- conCtx.DropCounter.IncreaseByValue(0, uint64(expCtx.DropCount), 0)
- conCtx.RetransmitCounter.IncreaseByValue(0, uint64(expCtx.RetransmitCount), 0)
-}
-
-func (c *Context) cleanAndGetAllExceptionContexts() map[SocketBasicKey]*SocketExceptionValue {
- c.socketExceptionOperationLock.Lock()
- defer c.socketExceptionOperationLock.Unlock()
-
- result := c.socketExceptionStatics
- c.socketExceptionStatics = make(map[SocketBasicKey]*SocketExceptionValue)
- return result
-}
-
-func (c *Context) getAllConnectionWithContext() []*ConnectionContext {
- result := make([]*ConnectionContext, 0)
- result = append(result, c.closedConnections...)
- for _, con := range c.activeConnections.Items() {
- result = append(result, con.(*ConnectionContext))
- }
-
- c.closedConnections = make([]*ConnectionContext, 0)
- return result
-}
-
-type ActiveConnectionInBPF struct {
- RandomID uint64
- Pid uint32
- SocketFD uint32
- Role ConnectionRole
- SocketFamily uint32
-
- RemoteAddrV4 uint32
- RemoteAddrV6 [16]uint8
- RemoteAddrPort uint32
- LocalAddrV4 uint32
- LocalAddrV6 [16]uint8
- LocalAddrPort uint32
-
- WriteBytes uint64
- WriteCount uint64
- WriteExeTime uint64
- ReadBytes uint64
- ReadCount uint64
- ReadExeTime uint64
-
- WriteRTTCount uint64
- WriteRTTExeTime uint64
-
- // Protocol analyze context
- Protocol ConnectionProtocol
- IsSSL uint32
-
- // the connect event is already sent
- ConnectEventIsSent uint32
-}
-
-type HistogramDataKey struct {
- ConnectionID uint64
- RandomID uint64
- DataDirection SocketDataDirection
- DataType SocketDataStaticsType
- Bucket uint64
-}
-
-func (c *Context) fillConnectionMetrics(ccs []*ConnectionContext) {
- // rebuild to the map for helping quick search correlate ConnectionContext
- keyWithContext := make(map[string]*ConnectionContext)
- var activeConnection ActiveConnectionInBPF
- closedConns := make([]string, 0)
- for _, cc := range ccs {
- connectionKey := c.generateConnectionKey(cc.ConnectionID, cc.RandomID)
- // refresh the histogram for prepare to update the buckets
- cc.WriteRTTHistogram.RefreshCurrent()
- cc.WriteExeTimeHistogram.RefreshCurrent()
- cc.ReadExeTimeHistogram.RefreshCurrent()
- keyWithContext[connectionKey] = cc
-
- // if connection not closed, then load the basic stats from bpf map
- if !cc.ConnectionClosed {
- err := c.bpf.ActiveConnectionMap.Lookup(cc.ConnectionID, &activeConnection)
-
- if err != nil {
- if errors.Is(err, ebpf.ErrKeyNotExist) {
- closedConns = append(closedConns, connectionKey)
- } else {
- log.Warnf("lookup the active connection error, connection id: %d, error: %v", cc.ConnectionID, err)
- }
- continue
- }
-
- if log.Enable(logrus.DebugLevel) {
- marshal, _ := json.Marshal(activeConnection)
- log.Debugf("found the active connection, conid: %d, data: %s", cc.ConnectionID, string(marshal))
- }
-
- if cc.Role == ConnectionRoleUnknown && activeConnection.Role != ConnectionRoleUnknown {
- cc.Role = activeConnection.Role
- }
- if cc.Protocol == ConnectionProtocolUnknown && activeConnection.Protocol != ConnectionProtocolUnknown {
- cc.Protocol = activeConnection.Protocol
- }
- if !cc.IsSSL && activeConnection.IsSSL == 1 {
- cc.IsSSL = true
- }
-
- // update the role
- cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
- cc.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
- cc.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
- }
- }
- if len(closedConns) > 0 {
- c.deleteConnectionOnly(closedConns)
- }
-
- // fill the histogram metrics
- c.fillHistograms(keyWithContext)
-}
-
-func (c *Context) fillHistograms(keyWithContext map[string]*ConnectionContext) {
- var key HistogramDataKey
- var count uint32
- histogramIt := c.bpf.SocketConnectionStatsHistogram.Iterate()
- // for-each the stats map
- for histogramIt.Next(&key, &count) {
- // if it's not relate to the ConnectionContext just ignore
- cc := keyWithContext[c.generateConnectionKey(key.ConnectionID, key.RandomID)]
- if cc == nil {
- continue
- }
-
- // add the histogram data
- var histogram *SocketDataHistogramWithHistory
- if key.DataDirection == SocketDataDirectionEgress {
- if key.DataType == SocketDataStaticsTypeExeTime {
- histogram = cc.WriteExeTimeHistogram
- } else if key.DataType == SocketDataStaticsTypeRTT {
- histogram = cc.WriteRTTHistogram
- }
- } else if key.DataDirection == SocketDataDirectionIngress {
- histogram = cc.ReadExeTimeHistogram
- }
- if histogram == nil {
- log.Warnf("unknown the histogram data: %v", cc)
- continue
- }
- histogram.UpdateToCurrent(key.Bucket, count)
-
- // delete the stats if the connection already closed
- if cc.ConnectionClosed {
- if err := c.bpf.SocketConnectionStatsHistogram.Delete(key); err != nil {
- log.Warnf("delete the connection stats failure: %v", err)
- }
- }
- }
-}
-
-// SocketConnectEvent Socket have been connection/accept event
-type SocketConnectEvent struct {
- ConID uint64
- RandomID uint64
- ExeTime uint64
- NeedComplete uint32
- Pid uint32
- FD uint32
- FuncName uint32
-
- // socket information if exists
- Role ConnectionRole
- SocketFamily uint32
- RemoteAddrV4 uint32
- RemoteAddrV6 [16]uint8
- RemoteAddrPort uint32
- LocalAddrV4 uint32
- LocalAddrV6 [16]uint8
- LocalAddrPort uint32
-}
-
-func (c *Context) handleSocketConnectEvent(data interface{}) {
- event := data.(*SocketConnectEvent)
- processes := c.processes[int32(event.Pid)]
- if len(processes) == 0 {
- log.Warnf("get process connect event, but this process is don't need to monitor, pid: %d", event.Pid)
- return
- }
-
- // build active connection information
- con := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, event.FD, processes, false)
- con.ConnectExecuteTime = event.ExeTime
- con.Role = event.Role
- if event.NeedComplete == 0 {
- con.RemotePort = uint16(event.RemoteAddrPort)
- con.LocalPort = uint16(event.LocalAddrPort)
- if event.SocketFamily == unix.AF_INET {
- con.LocalIP = parseAddressV4(event.LocalAddrV4)
- con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
- } else {
- con.LocalIP = parseAddressV6(event.LocalAddrV6)
- con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
- }
- } else {
- // if the remote address exists then setting it
- if event.RemoteAddrPort != 0 {
- con.RemotePort = uint16(event.RemoteAddrPort)
- if event.SocketFamily == unix.AF_INET {
- con.RemoteIP = parseAddressV4(event.RemoteAddrV4)
- } else {
- con.RemoteIP = parseAddressV6(event.RemoteAddrV6)
- }
- }
- c.sockParseQueue <- con
- }
-
- // add to the context
- c.saveActiveConnection(con)
-
- if log.Enable(logrus.DebugLevel) {
- marshal, _ := json.Marshal(event)
- log.Debugf("found connect event: role: %s, %s:%d:%d -> %s:%d, json: %s", con.Role.String(),
- con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, con.RemotePort, string(marshal))
- }
-}
-
-func (c *Context) saveActiveConnection(con *ConnectionContext) {
- c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, con.RandomID), con)
-}
-
-type SocketCloseEvent struct {
- ConID uint64
- RandomID uint64
- ExeTime uint64
- Pid uint32
- SocketFD uint32
- Role ConnectionRole
- Protocol ConnectionProtocol
- IsSSL uint32
- Fix uint32
-
- SocketFamily uint32
- RemoteAddrV4 uint32
- RemoteAddrV6 [16]uint8
- RemoteAddrPort uint32
- LocalAddrV4 uint32
- LocalAddrV6 [16]uint8
- LocalAddrPort uint32
- Fix1 uint32
-
- WriteBytes uint64
- WriteCount uint64
- WriteExeTime uint64
- ReadBytes uint64
- ReadCount uint64
- ReadExeTime uint64
-
- WriteRTTCount uint64
- WriteRTTExeTime uint64
-}
-
-// batch to re-process all cached closed event
-func (c *Context) batchReProcessCachedCloseEvent() {
- for len(c.flushClosedEvents) > 0 {
- event := <-c.flushClosedEvents
- if !c.socketClosedEvent0(event) {
- // if cannot the found the active connection, then just create a new closed connection context
- processes := c.processes[int32(event.Pid)]
- if len(processes) == 0 {
- continue
- }
- cc := c.newConnectionContext(event.ConID, event.RandomID, event.Pid, event.SocketFD, processes, true)
- if event.SocketFamily == unix.AF_INET {
- cc.RemoteIP = parseAddressV4(event.RemoteAddrV4)
- cc.LocalIP = parseAddressV4(event.LocalAddrV4)
- } else if event.SocketFamily == unix.AF_INET6 {
- cc.RemoteIP = parseAddressV6(event.RemoteAddrV6)
- cc.LocalIP = parseAddressV6(event.LocalAddrV6)
- } else {
- continue
- }
-
- // append to the closed connection
- c.closedConnections = append(c.closedConnections, c.combineClosedConnection(cc, event))
- }
- }
-}
-
-func (c *Context) newConnectionContext(conID, randomID uint64, pid, fd uint32, processes []api.ProcessInterface, conClosed bool) *ConnectionContext {
- return &ConnectionContext{
- // metadata
- ConnectionID: conID,
- RandomID: randomID,
- LocalPid: pid,
- SocketFD: fd,
- LocalProcesses: processes,
- ConnectionClosed: conClosed,
-
- // metrics
- WriteCounter: NewSocketDataCounterWithHistory(),
- ReadCounter: NewSocketDataCounterWithHistory(),
- WriteRTTCounter: NewSocketDataCounterWithHistory(),
- WriteRTTHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitUS),
- WriteExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
- ReadExeTimeHistogram: NewSocketDataHistogramWithHistory(HistogramDataUnitNS),
- }
-}
-
-func (c *Context) handleSocketCloseEvent(data interface{}) {
- event := data.(*SocketCloseEvent)
-
- if log.Enable(logrus.DebugLevel) {
- marshal, _ := json.Marshal(event)
- log.Debugf("found close event: %s", string(marshal))
- }
-
- // try to handle the socket close event
- if !c.socketClosedEvent0(event) {
- // is not in active connection, maybe it's not have been added to activate first
- // just add to the close queue, wait for the flush connection with interval
- c.flushClosedEvents <- event
- return
- }
-}
-
-// SocketExceptionOperationEvent Socket have been retransmitted/drop the package event
-type SocketExceptionOperationEvent struct {
- Pid uint32
- SocketFamily uint32
- RemoteAddrV4 uint32
- RemoteAddrV6 [16]uint8
- RemoteAddrPort uint32
- Type SocketExceptionOperationType
-}
-
-func (c *Context) handleSocketExceptionOperationEvent(data interface{}) {
- event := data.(*SocketExceptionOperationEvent)
- c.socketExceptionOperationLock.Lock()
- defer c.socketExceptionOperationLock.Unlock()
-
- key := SocketBasicKey{
- Pid: event.Pid,
- Family: event.SocketFamily,
- RemoteAddrV4: event.RemoteAddrV4,
- RemoteAddrV6: event.RemoteAddrV6,
- RemotePort: event.RemoteAddrPort,
- }
- exceptionValue := c.socketExceptionStatics[key]
- if exceptionValue == nil {
- exceptionValue = &SocketExceptionValue{}
- c.socketExceptionStatics[key] = exceptionValue
- }
-
- switch event.Type {
- case SocketExceptionOperationRetransmit:
- exceptionValue.RetransmitCount++
- case SocketExceptionOperationDrop:
- exceptionValue.DropCount++
- default:
- log.Warnf("unknown socket exception operation type: %d", event.Type)
- }
-
- if log.Enable(logrus.DebugLevel) {
- marshal, _ := json.Marshal(event)
- log.Debugf("found socket exception operation event: %s", string(marshal))
- }
-}
-
-func (c *Context) socketClosedEvent0(event *SocketCloseEvent) bool {
- activeCon := c.foundAndDeleteConnection(event)
- if activeCon == nil {
- return false
- }
-
- // combine the connection data
- c.closedConnections = append(c.closedConnections, c.combineClosedConnection(activeCon, event))
- return true
-}
-
-func (c *Context) foundAndDeleteConnection(event *SocketCloseEvent) *ConnectionContext {
- conKey := c.generateConnectionKey(event.ConID, event.RandomID)
- val, exists := c.activeConnections.Pop(conKey)
- if !exists {
- return nil
- }
- return val.(*ConnectionContext)
-}
-
-func (c *Context) deleteConnectionOnly(ccs []string) {
- for _, cc := range ccs {
- c.activeConnections.Remove(cc)
- }
-}
-
-func (c *Context) combineClosedConnection(active *ConnectionContext, closed *SocketCloseEvent) *ConnectionContext {
- active.ConnectionClosed = true
-
- if active.Role == ConnectionRoleUnknown && closed.Role != ConnectionRoleUnknown {
- active.Role = closed.Role
- }
- if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != ConnectionProtocolUnknown {
- active.Protocol = closed.Protocol
- }
- if !active.IsSSL && closed.IsSSL == 1 {
- active.IsSSL = true
- }
-
- active.WriteCounter.UpdateToCurrent(closed.WriteBytes, closed.WriteCount, closed.WriteExeTime)
- active.ReadCounter.UpdateToCurrent(closed.ReadBytes, closed.ReadCount, closed.ReadExeTime)
- active.WriteRTTCounter.UpdateToCurrent(0, closed.WriteRTTCount, closed.WriteRTTExeTime)
- active.CloseExecuteTime = closed.ExeTime
- return active
-}
-
-func (c *Context) generateConnectionKey(conID, randomID uint64) string {
- return fmt.Sprintf("%d_%d", conID, randomID)
-}
-
-func (c *Context) AddProcesses(processes []api.ProcessInterface) error {
- var err error
- for _, p := range processes {
- pid := p.Pid()
- alreadyExists := false
- if len(c.processes[pid]) > 0 {
- for _, existsProcess := range c.processes[pid] {
- if p.ID() == existsProcess.ID() {
- alreadyExists = true
- break
- }
- }
- }
-
- if alreadyExists {
- continue
- }
-
- c.processes[pid] = append(c.processes[pid], p)
-
- // add to the process let it could be monitored
- if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
- err = multierror.Append(err, err1)
- }
-
- // add process ssl config
- if err1 := addSSLProcess(int(pid), c.bpf, c.linker); err1 != nil {
- err = multierror.Append(err, err1)
- }
-
- log.Debugf("add monitor process, pid: %d", pid)
- }
- return err
-}
-
-func (c *Context) DeleteProcesses(processes []api.ProcessInterface) (emptyProcesses bool, deleteError error) {
- var err error
- for _, p := range processes {
- pid := p.Pid()
- existsProcesses := make([]api.ProcessInterface, 0)
- existsProcesses = append(existsProcesses, c.processes[pid]...)
-
- // update process entities
- newProcesses := make([]api.ProcessInterface, 0)
-
- for _, existProcess := range existsProcesses {
- if p.ID() != existProcess.ID() {
- newProcesses = append(newProcesses, existProcess)
- }
- }
-
- // no process need delete, then just ignore
- if len(newProcesses) == len(existsProcesses) {
- continue
- }
-
- // the process no need to monitor, then just ignore
- if len(newProcesses) == 0 {
- if err1 := c.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
- err = multierror.Append(err, err1)
- }
- log.Debugf("delete monitor process: %d", pid)
- delete(c.processes, pid)
- continue
- }
- c.processes[pid] = newProcesses
- }
- return len(c.processes) == 0, err
-}
-
-func parseAddressV4(val uint32) string {
- return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
-
-func parseAddressV6(val [16]uint8) string {
- return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String()
-}
diff --git a/pkg/profiling/task/network/metrics.go b/pkg/profiling/task/network/metrics.go
deleted file mode 100644
index a52d5e3..0000000
--- a/pkg/profiling/task/network/metrics.go
+++ /dev/null
@@ -1,396 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package network
-
-import (
- "fmt"
- "time"
-
- "github.com/apache/skywalking-rover/pkg/process/api"
- "github.com/apache/skywalking-rover/pkg/tools"
-
- v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
-)
-
-type SocketDataCounter struct {
- Bytes uint64
- Count uint64
- ExeTime uint64
-}
-
-func NewSocketDataCounter() *SocketDataCounter {
- return &SocketDataCounter{}
-}
-
-func (s *SocketDataCounter) Increase(d *SocketDataCounter) {
- s.IncreaseByValue(d.Bytes, d.Count, d.ExeTime)
-}
-
-func (s *SocketDataCounter) IncreaseByValue(bytes, count, exeTime uint64) {
- s.Bytes += bytes
- s.Count += count
- s.ExeTime += exeTime
-}
-
-func (s *SocketDataCounter) NotEmpty() bool {
- return s.Count > 0
-}
-
-// SocketDataCounterWithHistory means the socket send/receive data metrics
-type SocketDataCounterWithHistory struct {
- Pre *SocketDataCounter
- Cur *SocketDataCounter
-}
-
-func NewSocketDataCounterWithHistory() *SocketDataCounterWithHistory {
- return &SocketDataCounterWithHistory{
- Pre: NewSocketDataCounter(),
- Cur: NewSocketDataCounter(),
- }
-}
-
-func (c *SocketDataCounterWithHistory) UpdateToCurrent(bytes, count, exeTime uint64) {
- c.Pre = c.Cur
- c.Cur = &SocketDataCounter{
- Bytes: bytes,
- Count: count,
- ExeTime: exeTime,
- }
-}
-
-func (c *SocketDataCounterWithHistory) CalculateIncrease() *SocketDataCounter {
- return &SocketDataCounter{
- Bytes: subtractionValue(c.Cur.Bytes, c.Pre.Bytes),
- Count: subtractionValue(c.Cur.Count, c.Pre.Count),
- ExeTime: subtractionValue(c.Cur.ExeTime, c.Pre.ExeTime),
- }
-}
-
-// SocketHistogramBucketsNs means the histogram bucket: 0ms, 0.01ms, 0.05ms, 0.1ms, 0.5ms, 1ms, 1.2ms, 1.5ms, 1.7ms, 2ms,
-// 2.5ms, 3ms, 5ms, 7ms, 10ms, 13ms, 16ms, 20ms, 25ms, 30ms, 35ms, 40ms, 45ms, 50ms, 70ms, 100ms, 150ms,
-// 200ms, 300ms, 500ms, 1s, 2s, 3s, 5s
-// value unit: ns
-var SocketHistogramBucketsNs = []float64{0, 10000, 50000, 100000, 500000, 1000000, 1200000, 1500000, 1700000, 2000000,
- 2500000, 3000000, 5000000, 7000000, 10000000, 13000000, 16000000, 20000000, 25000000, 30000000, 35000000, 40000000,
- 45000000, 50000000, 70000000, 100000000, 150000000, 200000000, 300000000, 500000000, 1000000000, 2000000000,
- 3000000000, 5000000000}
-
-// SocketHistogramBucketsUs same with SocketHistogramBucketsNs, but the value unit: us
-var SocketHistogramBucketsUs = []float64{0, 10, 50, 100, 500, 1000, 1200, 1500, 1700, 2000,
- 2500, 3000, 5000, 7000, 10000, 13000, 16000, 20000, 25000, 30000, 35000, 40000,
- 45000, 50000, 70000, 100000, 150000, 200000, 300000, 500000, 1000000, 2000000,
- 3000000, 5000000}
-var SocketHistogramBucketsCount = len(SocketHistogramBucketsNs)
-
-type SocketDataHistogram struct {
- Unit HistogramDataUnit
- Buckets map[uint64]uint32
-}
-
-func (h *SocketDataHistogram) Overwrite(other *SocketDataHistogram) {
- for k, v := range other.Buckets {
- h.Buckets[k] = v
- }
-}
-
-func (h *SocketDataHistogram) Update(bucket uint64, value uint32) {
- h.Buckets[bucket] = value
-}
-
-func (h *SocketDataHistogram) Increase(other *SocketDataHistogram) {
- for k, v := range other.Buckets {
- h.Buckets[k] += v
- }
-}
-
-func (h *SocketDataHistogram) IncreaseByValue(val uint64) {
- floatVal := float64(val)
- for inx, curVal := range SocketHistogramBucketsNs {
- if inx > 0 && curVal > floatVal {
- h.Buckets[uint64(inx-1)]++
- return
- }
- }
- h.Buckets[uint64(len(SocketHistogramBucketsNs)-1)]++
-}
-
-func (h *SocketDataHistogram) NotEmpty() bool {
- for _, v := range h.Buckets {
- if v > 0 {
- return true
- }
- }
- return false
-}
-
-func NewSocketDataHistogram(unit HistogramDataUnit) *SocketDataHistogram {
- buckets := make(map[uint64]uint32, SocketHistogramBucketsCount)
- for i := 0; i < SocketHistogramBucketsCount; i++ {
- buckets[uint64(i)] = 0
- }
- return &SocketDataHistogram{
- Unit: unit,
- Buckets: buckets,
- }
-}
-
-type HistogramDataUnit int
-
-const (
- HistogramDataUnitNS HistogramDataUnit = 1
- HistogramDataUnitUS HistogramDataUnit = 2
-)
-
-type SocketDataHistogramWithHistory struct {
- Pre *SocketDataHistogram
- Cur *SocketDataHistogram
-}
-
-func NewSocketDataHistogramWithHistory(unit HistogramDataUnit) *SocketDataHistogramWithHistory {
- return &SocketDataHistogramWithHistory{
- Pre: NewSocketDataHistogram(unit),
- Cur: NewSocketDataHistogram(unit),
- }
-}
-
-func (h *SocketDataHistogramWithHistory) RefreshCurrent() {
- // storage the current value to the previous buckets
- h.Pre.Overwrite(h.Cur)
-}
-
-func (h *SocketDataHistogramWithHistory) UpdateToCurrent(bucket uint64, val uint32) {
- h.Cur.Update(bucket, val)
-}
-
-func (h *SocketDataHistogramWithHistory) CalculateIncrease() *SocketDataHistogram {
- histogram := NewSocketDataHistogram(h.Cur.Unit)
- var increaseVal uint32
- for curK, curV := range h.Cur.Buckets {
- if increaseVal = curV - h.Pre.Buckets[curK]; increaseVal > 0 {
- histogram.Buckets[curK] = increaseVal
- }
- }
- return histogram
-}
-
-func subtractionValue(v1, v2 uint64) uint64 {
- if v1 > v2 {
- return v1 - v2
- }
- return 0
-}
-
-type ProcessTraffic struct {
- analyzer *TrafficAnalyzer
-
- // local process information
- LocalPid uint32
- LocalProcesses []api.ProcessInterface
- LocalIP string
- LocalPort uint16
-
- // current connection role of local process
- ConnectionRole ConnectionRole
- // the protocol of the connection
- Protocol ConnectionProtocol
- // current connection is SSL
- IsSSL bool
-
- // remote process/address information
- RemoteIP string
- RemotePort uint16
- RemotePid uint32
- RemoteProcesses []api.ProcessInterface
-
- // statics
- WriteCounter *SocketDataCounter
- ReadCounter *SocketDataCounter
- // write RTT
- WriteRTTCounter *SocketDataCounter
-
- // connection operate
- ConnectCounter *SocketDataCounter
- CloseCounter *SocketDataCounter
-
- // exception operate
- RetransmitCounter *SocketDataCounter
- DropCounter *SocketDataCounter
-
- // histograms
- // write execute time and RTT
- WriteRTTHistogram *SocketDataHistogram
- WriteExeTimeHistogram *SocketDataHistogram
- // read execute time
- ReadExeTimeHistogram *SocketDataHistogram
-
- // connection operate
- ConnectExeTimeHistogram *SocketDataHistogram
- CloseExeTimeHistogram *SocketDataHistogram
-}
-
-func (r *ProcessTraffic) ContainsAnyTraffic() bool {
- return r.WriteCounter.NotEmpty() || r.ReadCounter.NotEmpty() || r.WriteRTTCounter.NotEmpty() || r.ConnectCounter.NotEmpty() ||
- r.CloseCounter.NotEmpty() || r.WriteRTTHistogram.NotEmpty() || r.WriteExeTimeHistogram.NotEmpty() || r.ReadExeTimeHistogram.NotEmpty() ||
- r.ConnectExeTimeHistogram.NotEmpty() || r.CloseExeTimeHistogram.NotEmpty()
-}
-
-func (r *ProcessTraffic) GenerateMetrics(metricsPrefix string) []*v3.MeterDataCollection {
- result := make([]*v3.MeterDataCollection, 0)
- for _, p := range r.LocalProcesses {
- collection := make([]*v3.MeterData, 0)
- collection = r.appendCounterValues(collection, metricsPrefix, "write", p, r.WriteCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "read", p, r.ReadCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "write_rtt", p, r.WriteRTTCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "connect", p, r.ConnectCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "close", p, r.CloseCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "retransmit", p, r.RetransmitCounter)
- collection = r.appendCounterValues(collection, metricsPrefix, "drop", p, r.DropCounter)
-
- collection = r.appendHistogramValue(collection, metricsPrefix, "write_rtt", p, r.WriteRTTHistogram)
- collection = r.appendHistogramValue(collection, metricsPrefix, "write_exe_time", p, r.WriteExeTimeHistogram)
- collection = r.appendHistogramValue(collection, metricsPrefix, "read_exe_time", p, r.ReadExeTimeHistogram)
- collection = r.appendHistogramValue(collection, metricsPrefix, "connect_exe_time", p, r.ConnectExeTimeHistogram)
- collection = r.appendHistogramValue(collection, metricsPrefix, "close_exe_time", p, r.CloseExeTimeHistogram)
-
- if len(collection) == 0 {
- continue
- }
-
- // add entity
- collection[0].Service = p.Entity().ServiceName
- collection[0].ServiceInstance = p.Entity().InstanceName
- collection[0].Timestamp = time.Now().UnixMilli()
- result = append(result, &v3.MeterDataCollection{
- MeterData: collection,
- })
- }
-
- return result
-}
-
-func (r *ProcessTraffic) appendCounterValues(metrics []*v3.MeterData, metricsPrefix, name string, local api.ProcessInterface,
- counter *SocketDataCounter) []*v3.MeterData {
- if !counter.NotEmpty() {
- return metrics
- }
-
- count := float64(counter.Count)
- metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_counts_counter", local, count))
- if counter.Bytes > 0 {
- metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_bytes_counter", local, float64(counter.Bytes)))
- }
- if counter.ExeTime > 0 {
- metrics = append(metrics, r.buildSingleValue(metricsPrefix, name+"_exe_time_counter", local, float64(counter.ExeTime)/count))
- }
- return metrics
-}
-
-func (r *ProcessTraffic) appendHistogramValue(metrics []*v3.MeterData, metricsPrefix, name string,
- local api.ProcessInterface, histogram *SocketDataHistogram) []*v3.MeterData {
- if !histogram.NotEmpty() {
- return metrics
- }
-
- role, labels := r.buildBasicMeterLabels(local)
- values := make([]*v3.MeterBucketValue, 0)
- for bucket, count := range histogram.Buckets {
- var bucketInx = int(bucket)
- if bucketInx >= SocketHistogramBucketsCount {
- bucketInx = SocketHistogramBucketsCount - 1
- }
- var buckets []float64
- if histogram.Unit == HistogramDataUnitUS {
- buckets = SocketHistogramBucketsUs
- } else {
- buckets = SocketHistogramBucketsNs
- }
- values = append(values, &v3.MeterBucketValue{
- Bucket: buckets[bucketInx],
- Count: int64(count),
- })
- }
-
- return append(metrics, &v3.MeterData{
- Metric: &v3.MeterData_Histogram{
- Histogram: &v3.MeterHistogram{
- Name: fmt.Sprintf("%s%s_%s_histogram", metricsPrefix, role.String(), name),
- Labels: labels,
- Values: values,
- },
- },
- })
-}
-
-func (r *ProcessTraffic) buildSingleValue(prefix, name string, local api.ProcessInterface, val float64) *v3.MeterData {
- role, labels := r.buildBasicMeterLabels(local)
-
- return &v3.MeterData{
- Metric: &v3.MeterData_SingleValue{
- SingleValue: &v3.MeterSingleValue{
- Name: fmt.Sprintf("%s%s_%s", prefix, role.String(), name),
- Labels: labels,
- Value: val,
- },
- },
- }
-}
-
-func (r *ProcessTraffic) buildBasicMeterLabels(local api.ProcessInterface) (ConnectionRole, []*v3.Label) {
- curRole := r.ConnectionRole
- // add the default role
- if curRole == ConnectionRoleUnknown {
- curRole = ConnectionRoleClient
- }
- labels := make([]*v3.Label, 0)
-
- // two pair process/address info
- labels = r.appendMeterValue(labels, fmt.Sprintf("%s_process_id", curRole.String()), local.ID())
- labels = r.appendRemoteAddrssInfo(labels, curRole.Revert().String(), local)
-
- labels = r.appendMeterValue(labels, "side", curRole.String())
-
- // protocol and ssl
- labels = r.appendMeterValue(labels, "protocol", r.Protocol.String())
- labels = r.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", r.IsSSL))
- return curRole, labels
-}
-
-func (r *ProcessTraffic) appendRemoteAddrssInfo(labels []*v3.Label, prefix string, local api.ProcessInterface) []*v3.Label {
- if len(r.RemoteProcesses) != 0 {
- for _, p := range r.RemoteProcesses {
- // only match with same service instance
- if local.Entity().ServiceName == p.Entity().ServiceName &&
- local.Entity().InstanceName == p.Entity().InstanceName {
- return r.appendMeterValue(labels, prefix+"_process_id", p.ID())
- }
- }
- }
-
- if tools.IsLocalHostAddress(r.RemoteIP) || r.analyzer.IsLocalAddressInCache(r.RemoteIP) {
- return r.appendMeterValue(labels, prefix+"_local", "true")
- }
-
- return r.appendMeterValue(labels, prefix+"_address", fmt.Sprintf("%s:%d", r.RemoteIP, r.RemotePort))
-}
-
-func (r *ProcessTraffic) appendMeterValue(labels []*v3.Label, name, value string) []*v3.Label {
- return append(labels, &v3.Label{
- Name: name,
- Value: value,
- })
-}
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index c623bfd..b52aa3c 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -24,7 +24,7 @@ import (
"sync"
"time"
- "github.com/sirupsen/logrus"
+ "github.com/cilium/ebpf"
"github.com/hashicorp/go-multierror"
@@ -35,16 +35,14 @@ import (
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/profiling/task/base"
- "github.com/apache/skywalking-rover/pkg/tools/btf"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze"
+ analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
)
-// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
-// nolint
-//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
-
-var log = logger.GetLogger("profiling", "task", "network", "topology")
+var log = logger.GetLogger("profiling", "task", "network")
type Runner struct {
initOnce sync.Once
@@ -54,18 +52,19 @@ type Runner struct {
reportInterval time.Duration
meterPrefix string
- bpf *bpfObjects
- linker *Linker
- bpfContext *Context
+ bpf *bpf.Loader
+ processes map[int32][]api.ProcessInterface
+ analyzeContext *analyzeBase.AnalyzerContext
ctx context.Context
cancel context.CancelFunc
}
func NewGlobalRunnerContext() *Runner {
+ processes := make(map[int32][]api.ProcessInterface)
return &Runner{
- bpfContext: NewContext(),
- linker: NewLinker(),
+ processes: processes,
+ analyzeContext: analyze.NewContext(processes),
}
}
@@ -78,7 +77,38 @@ func (r *Runner) init(config *base.TaskConfig, moduleMgr *module.Manager) error
}
func (r *Runner) DeleteProcesses(processes []api.ProcessInterface) (bool, error) {
- return r.bpfContext.DeleteProcesses(processes)
+ var err error
+ for _, p := range processes {
+ pid := p.Pid()
+ existsProcesses := make([]api.ProcessInterface, 0)
+ existsProcesses = append(existsProcesses, r.processes[pid]...)
+
+ // update process entities
+ newProcesses := make([]api.ProcessInterface, 0)
+
+ for _, existProcess := range existsProcesses {
+ if p.ID() != existProcess.ID() {
+ newProcesses = append(newProcesses, existProcess)
+ }
+ }
+
+ // no process need delete, then just ignore
+ if len(newProcesses) == len(existsProcesses) {
+ continue
+ }
+
+ // the process no need to monitor, then just ignore
+ if len(newProcesses) == 0 {
+ if err1 := r.bpf.ProcessMonitorControl.Delete(uint32(pid)); err1 != nil {
+ err = multierror.Append(err, err1)
+ }
+ log.Debugf("delete monitor process: %d", pid)
+ delete(r.processes, pid)
+ continue
+ }
+ r.processes[pid] = newProcesses
+ }
+ return len(r.processes) == 0, err
}
func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) error {
@@ -86,59 +116,58 @@ func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) er
defer r.startLock.Unlock()
// if already start, then just adding the processes
if r.bpf != nil {
- return r.bpfContext.AddProcesses(processes)
+ return r.addProcesses(processes)
}
r.ctx, r.cancel = context.WithCancel(ctx)
// load bpf program
- objs := bpfObjects{}
- if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil {
+ bpfLoader, err := bpf.NewLoader()
+ if err != nil {
return err
}
- r.bpf = &objs
- r.bpfContext.Init(&objs, r.linker)
+ r.bpf = bpfLoader
- if err := r.bpfContext.AddProcesses(processes); err != nil {
+ if err := r.addProcesses(processes); err != nil {
return err
}
// register all handlers
- r.bpfContext.RegisterAllHandlers()
- r.bpfContext.StartSocketAddressParser(r.ctx)
+ r.analyzeContext.RegisterAllHandlers(bpfLoader)
+ r.analyzeContext.StartSocketAddressParser(r.ctx)
// sock opts
- r.linker.AddSysCall("close", objs.SysClose, objs.SysCloseRet)
- r.linker.AddSysCall("connect", objs.SysConnect, objs.SysConnectRet)
- r.linker.AddSysCall("accept", objs.SysAccept, objs.SysAcceptRet)
- r.linker.AddSysCall("accept4", objs.SysAccept, objs.SysAcceptRet)
- r.linker.AddLink(link.Kretprobe, objs.SockAllocRet, "sock_alloc")
- r.linker.AddLink(link.Kprobe, objs.TcpConnect, "tcp_connect")
+ bpfLoader.AddSysCall("close", bpfLoader.SysClose, bpfLoader.SysCloseRet)
+ bpfLoader.AddSysCall("connect", bpfLoader.SysConnect, bpfLoader.SysConnectRet)
+ bpfLoader.AddSysCall("accept", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
+ bpfLoader.AddSysCall("accept4", bpfLoader.SysAccept, bpfLoader.SysAcceptRet)
+ bpfLoader.AddLink(link.Kretprobe, bpfLoader.SockAllocRet, "sock_alloc")
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpConnect, "tcp_connect")
// write/receive data
- r.linker.AddSysCall("send", objs.SysSend, objs.SysSendRet)
- r.linker.AddSysCall("sendto", objs.SysSendto, objs.SysSendtoRet)
- r.linker.AddSysCall("sendmsg", objs.SysSendmsg, objs.SysSendmsgRet)
- r.linker.AddSysCall("sendmmsg", objs.SysSendmmsg, objs.SysSendmmsgRet)
- r.linker.AddSysCall("sendfile", objs.SysSendfile, objs.SysSendfileRet)
- r.linker.AddSysCall("sendfile64", objs.SysSendfile, objs.SysSendfileRet)
- r.linker.AddSysCall("write", objs.SysWrite, objs.SysWriteRet)
- r.linker.AddSysCall("writev", objs.SysWritev, objs.SysWritevRet)
- r.linker.AddSysCall("read", objs.SysRead, objs.SysReadRet)
- r.linker.AddSysCall("readv", objs.SysReadv, objs.SysReadvRet)
- r.linker.AddSysCall("recv", objs.SysRecv, objs.SysRecvRet)
- r.linker.AddSysCall("recvfrom", objs.SysRecvfrom, objs.SysRecvfromRet)
- r.linker.AddSysCall("recvmsg", objs.SysRecvmsg, objs.SysRecvmsgRet)
- r.linker.AddSysCall("recvmmsg", objs.SysRecvmmsg, objs.SysRecvmmsgRet)
- r.linker.AddLink(link.Kprobe, objs.TcpRcvEstablished, "tcp_rcv_established")
- r.linker.AddLink(link.Kprobe, objs.SecuritySocketSendmsg, "security_socket_sendmsg")
- r.linker.AddLink(link.Kprobe, objs.SecuritySocketRecvmsg, "security_socket_recvmsg")
+ bpfLoader.AddSysCall("send", bpfLoader.SysSend, bpfLoader.SysSendRet)
+ bpfLoader.AddSysCall("sendto", bpfLoader.SysSendto, bpfLoader.SysSendtoRet)
+ bpfLoader.AddSysCall("sendmsg", bpfLoader.SysSendmsg, bpfLoader.SysSendmsgRet)
+ bpfLoader.AddSysCall("sendmmsg", bpfLoader.SysSendmmsg, bpfLoader.SysSendmmsgRet)
+ bpfLoader.AddSysCall("sendfile", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
+ bpfLoader.AddSysCall("sendfile64", bpfLoader.SysSendfile, bpfLoader.SysSendfileRet)
+ bpfLoader.AddSysCall("write", bpfLoader.SysWrite, bpfLoader.SysWriteRet)
+ bpfLoader.AddSysCall("writev", bpfLoader.SysWritev, bpfLoader.SysWritevRet)
+ bpfLoader.AddSysCall("read", bpfLoader.SysRead, bpfLoader.SysReadRet)
+ bpfLoader.AddSysCall("readv", bpfLoader.SysReadv, bpfLoader.SysReadvRet)
+ bpfLoader.AddSysCall("recv", bpfLoader.SysRecv, bpfLoader.SysRecvRet)
+ bpfLoader.AddSysCall("recvfrom", bpfLoader.SysRecvfrom, bpfLoader.SysRecvfromRet)
+ bpfLoader.AddSysCall("recvmsg", bpfLoader.SysRecvmsg, bpfLoader.SysRecvmsgRet)
+ bpfLoader.AddSysCall("recvmmsg", bpfLoader.SysRecvmmsg, bpfLoader.SysRecvmmsgRet)
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRcvEstablished, "tcp_rcv_established")
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketSendmsg, "security_socket_sendmsg")
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.SecuritySocketRecvmsg, "security_socket_recvmsg")
// retransmit/drop
- r.linker.AddLink(link.Kprobe, objs.TcpRetransmit, "tcp_retransmit_skb")
- r.linker.AddLink(link.Kprobe, objs.TcpDrop, "tcp_drop")
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpRetransmit, "tcp_retransmit_skb")
+ bpfLoader.AddLink(link.Kprobe, bpfLoader.TcpDrop, "tcp_drop")
- if err := r.linker.HasError(); err != nil {
- _ = r.linker.Close()
+ if err := bpfLoader.HasError(); err != nil {
+ _ = bpfLoader.Close()
return err
}
@@ -165,31 +194,12 @@ func (r *Runner) registerMetricsReport() {
}
func (r *Runner) flushMetrics() error {
- // flush all connection from bpf
- connections, err := r.bpfContext.FlushAllConnection()
+ // flush all metrics
+ metricsBuilder, err := r.analyzeContext.FlushAllMetrics(r.bpf, r.meterPrefix)
if err != nil {
return err
}
- if len(connections) == 0 {
- return nil
- }
-
- if log.Enable(logrus.DebugLevel) {
- for _, con := range connections {
- log.Debugf("found connection: %d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, read: %d bytes/%d, write: %d bytes/%d",
- con.ConnectionID, con.Role.String(),
- con.LocalIP, con.LocalPort, con.LocalPid, con.RemoteIP, con.RemotePort,
- con.Protocol.String(), con.IsSSL, con.WriteCounter.Cur.Bytes, con.WriteCounter.Cur.Count,
- con.ReadCounter.Cur.Bytes, con.ReadCounter.Cur.Count)
- }
- }
- // combine all connection
- analyzer := NewTrafficAnalyzer(r.bpfContext.processes)
- traffics := analyzer.CombineConnectionToTraffics(connections)
- if len(traffics) == 0 {
- return nil
- }
- r.logTheMetricsConnections(traffics)
+ metrics := metricsBuilder.Build()
// send metrics
batch, err := r.meterClient.CollectBatch(r.ctx)
@@ -202,13 +212,10 @@ func (r *Runner) flushMetrics() error {
}
}()
count := 0
- for _, traffic := range traffics {
- collections := traffic.GenerateMetrics(r.meterPrefix)
- for _, col := range collections {
- count += len(col.MeterData)
- if err := batch.Send(col); err != nil {
- return err
- }
+ for _, m := range metrics {
+ count += len(m.MeterData)
+ if err := batch.Send(m); err != nil {
+ return err
}
}
if count > 0 {
@@ -217,32 +224,6 @@ func (r *Runner) flushMetrics() error {
return nil
}
-func (r *Runner) logTheMetricsConnections(traffices []*ProcessTraffic) {
- if !log.Enable(logrus.DebugLevel) {
- return
- }
- for _, traffic := range traffices {
- localInfo := fmt.Sprintf("%s:%d(%d)", traffic.LocalIP, traffic.LocalPort, traffic.LocalPid)
- if len(traffic.LocalProcesses) > 0 {
- p := traffic.LocalProcesses[0]
- localInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)", p.Entity().Layer, p.Entity().ServiceName,
- p.Entity().InstanceName, p.Entity().ProcessName, traffic.LocalIP, traffic.LocalPort, traffic.LocalPid)
- }
-
- remoteInfo := fmt.Sprintf("%s:%d(%d)", traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
- if len(traffic.RemoteProcesses) > 0 {
- p := traffic.RemoteProcesses[0]
- remoteInfo = fmt.Sprintf("(%s)%s:%s:%s(%s:%d)(%d)",
- p.Entity().Layer, p.Entity().ServiceName, p.Entity().InstanceName, p.Entity().ProcessName,
- traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
- }
- side := traffic.ConnectionRole.String()
- log.Debugf("connection analyze result: %s : %s -> %s, protocol: %s, is SSL: %t, write: %d bytes/%d, read: %d bytes/%d",
- side, localInfo, remoteInfo, traffic.Protocol.String(), traffic.IsSSL, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
- traffic.ReadCounter.Bytes, traffic.ReadCounter.Count)
- }
-}
-
func (r *Runner) Stop() error {
// if starting, then need to wait start finished
r.startLock.Lock()
@@ -252,7 +233,6 @@ func (r *Runner) Stop() error {
}
var result error
r.stopOnce.Do(func() {
- result = r.closeWhenExists(result, r.linker)
result = r.closeWhenExists(result, r.bpf)
})
return result
@@ -284,3 +264,38 @@ func (r *Runner) init0(config *base.TaskConfig, moduleMgr *module.Manager) error
r.meterPrefix = config.Network.MeterPrefix + "_"
return nil
}
+
+func (r *Runner) addProcesses(processes []api.ProcessInterface) error {
+ var err error
+ for _, p := range processes {
+ pid := p.Pid()
+ alreadyExists := false
+ if len(r.processes[pid]) > 0 {
+ for _, existsProcess := range r.processes[pid] {
+ if p.ID() == existsProcess.ID() {
+ alreadyExists = true
+ break
+ }
+ }
+ }
+
+ if alreadyExists {
+ continue
+ }
+
+ r.processes[pid] = append(r.processes[pid], p)
+
+ // add to the process let it could be monitored
+ if err1 := r.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
+ err = multierror.Append(err, err1)
+ }
+
+ // add process ssl config
+ if err1 := addSSLProcess(int(pid), r.bpf); err1 != nil {
+ err = multierror.Append(err, err1)
+ }
+
+ log.Debugf("add monitor process, pid: %d", pid)
+ }
+ return err
+}
diff --git a/pkg/profiling/task/network/ssl.go b/pkg/profiling/task/network/ssl.go
index 9adb7c8..c586c3a 100644
--- a/pkg/profiling/task/network/ssl.go
+++ b/pkg/profiling/task/network/ssl.go
@@ -26,6 +26,8 @@ import (
"strconv"
"strings"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -52,36 +54,36 @@ type OpenSSLFdSymAddrConfigInBPF struct {
FDOffset uint32
}
-func addSSLProcess(pid int, bpf *bpfObjects, linker *Linker) error {
+func addSSLProcess(pid int, loader *bpf.Loader) error {
modules, err := tools.ProcessModules(int32(pid))
if err != nil {
return fmt.Errorf("read process modules error: %d, error: %v", pid, err)
}
// openssl process
- if err1 := processOpenSSLProcess(pid, bpf, linker, modules); err1 != nil {
+ if err1 := processOpenSSLProcess(pid, loader, modules); err1 != nil {
return err1
}
// envoy with boring ssl
- if err1 := processEnvoyProcess(pid, bpf, linker, modules); err1 != nil {
+ if err1 := processEnvoyProcess(pid, loader, modules); err1 != nil {
return err1
}
// GoTLS
- if err1 := processGoProcess(pid, bpf, linker, modules); err1 != nil {
+ if err1 := processGoProcess(pid, loader, modules); err1 != nil {
return err1
}
// Nodejs
- if err1 := processNodeProcess(pid, bpf, linker, modules); err1 != nil {
+ if err1 := processNodeProcess(pid, loader, modules); err1 != nil {
return err1
}
return nil
}
-func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processOpenSSLProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
var libcryptoName, libsslName = "libcrypto.so", "libssl.so"
var libcryptoPath, libsslPath string
processModules, err := findProcessModules(modules, libcryptoName, libsslName)
@@ -107,22 +109,22 @@ func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*
if err != nil {
return err
}
- if err := bpf.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil {
+ if err := loader.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil {
return err
}
// attach the linker
- return processOpenSSLModule(bpf, processModules[libsslName], linker)
+ return processOpenSSLModule(loader, processModules[libsslName])
}
-func processOpenSSLModule(bpf *bpfObjects, libSSLModule *profiling.Module, linker *Linker) error {
- libSSLLinker := linker.OpenUProbeExeFile(libSSLModule.Path)
- libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
- libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
- return linker.HasError()
+func processOpenSSLModule(loader *bpf.Loader, libSSLModule *profiling.Module) error {
+ libSSLLinker := loader.OpenUProbeExeFile(libSSLModule.Path)
+ libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, loader.OpensslWriteRet)
+ libSSLLinker.AddLink("SSL_read", loader.OpensslRead, loader.OpensslReadRet)
+ return loader.HasError()
}
-func processEnvoyProcess(_ int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processEnvoyProcess(_ int, loader *bpf.Loader, modules []*profiling.Module) error {
moduleName := "/envoy"
processModules, err := findProcessModules(modules, moduleName)
if err != nil {
@@ -149,14 +151,14 @@ func processEnvoyProcess(_ int, bpf *bpfObjects, linker *Linker, modules []*prof
log.Debugf("found current module is envoy, so attach to the SSL read and write")
// attach the linker
- libSSLLinker := linker.OpenUProbeExeFile(envoyModule.Path)
- libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet)
- libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet)
- return linker.HasError()
+ libSSLLinker := loader.OpenUProbeExeFile(envoyModule.Path)
+ libSSLLinker.AddLink("SSL_write", loader.OpensslWrite, loader.OpensslWriteRet)
+ libSSLLinker.AddLink("SSL_read", loader.OpensslRead, loader.OpensslReadRet)
+ return loader.HasError()
}
type SymbolLocation struct {
- Type GoTLSArgsLocationType
+ Type base.GoTLSArgsLocationType
Offset uint32
}
@@ -189,7 +191,7 @@ type GoStringInC struct {
Size uint64
}
-func processGoProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processGoProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
// check current process is go program
buildVersionSymbol := searchSymbol(modules, func(a, b string) bool {
return a == b
@@ -220,20 +222,20 @@ func processGoProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profi
}
// setting the locations
- if err := bpf.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); err != nil {
+ if err := loader.GoTlsArgsSymaddrMap.Put(uint32(pid), symbolConfig); err != nil {
return fmt.Errorf("setting the Go TLS argument location failure, pid: %d, error: %v", pid, err)
}
// uprobes
- exeFile := linker.OpenUProbeExeFile(pidExeFile)
- exeFile.AddLinkWithType("runtime.casgstatus", true, bpf.GoCasgstatus)
- exeFile.AddGoLink(goTLSWriteSymbol, bpf.GoTlsWrite, bpf.GoTlsWriteRet, elfFile)
- exeFile.AddGoLink(goTLSReadSymbol, bpf.GoTlsRead, bpf.GoTlsReadRet, elfFile)
+ exeFile := loader.OpenUProbeExeFile(pidExeFile)
+ exeFile.AddLinkWithType("runtime.casgstatus", true, loader.GoCasgstatus)
+ exeFile.AddGoLink(goTLSWriteSymbol, loader.GoTlsWrite, loader.GoTlsWriteRet, elfFile)
+ exeFile.AddGoLink(goTLSReadSymbol, loader.GoTlsRead, loader.GoTlsReadRet, elfFile)
- return linker.HasError()
+ return loader.HasError()
}
-func processNodeProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+func processNodeProcess(pid int, loader *bpf.Loader, modules []*profiling.Module) error {
moduleName1, moduleName2, libsslName := "/nodejs", "/node", "libssl.so"
processModules, err := findProcessModules(modules, moduleName1, moduleName2, libsslName)
if err != nil {
@@ -271,16 +273,16 @@ func processNodeProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*pro
return err
}
// setting the locations
- if err := bpf.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil {
+ if err := loader.NodeTlsSymaddrMap.Put(uint32(pid), config); err != nil {
return fmt.Errorf("setting the node TLS location failure, pid: %d, error: %v", pid, err)
}
// register node tls
- if err := registerNodeTLSProbes(v, bpf, linker, nodeModule, libsslModule); err != nil {
+ if err := registerNodeTLSProbes(v, loader, nodeModule, libsslModule); err != nil {
return fmt.Errorf("register node TLS probes failure, pid: %d, error: %v", pid, err)
}
// attach the OpenSSL Probe if needs
if needsReAttachSSL {
- return processOpenSSLModule(bpf, libsslModule, linker)
+ return processOpenSSLModule(loader, libsslModule)
}
return nil
}
@@ -301,9 +303,9 @@ var nodeTLSAddrWithVersions = []struct {
var nodeTLSProbeWithVersions = []struct {
v *version.Version
- f func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module)
+ f func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
}{
- {version.Build(10, 19, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module) {
+ {version.Build(10, 19, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrapC2E"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap7ClearInE"),
@@ -311,7 +313,7 @@ var nodeTLSProbeWithVersions = []struct {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap8ClearOutE"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
}},
- {version.Build(15, 0, 0), func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module) {
+ {version.Build(15, 0, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrapC2E"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrap7ClearInE"),
@@ -344,8 +346,8 @@ func findNodeTLSAddrConfig(v *version.Version) (*NodeTLSAddrInBPF, error) {
return nil, fmt.Errorf("could not support version: %s", v)
}
-func registerNodeTLSProbes(v *version.Version, bpf *bpfObjects, linker *Linker, nodeModule, libSSLModule *profiling.Module) error {
- var probeFunc func(uprobe *UProbeExeFile, bpf *bpfObjects, nodeModule *profiling.Module)
+func registerNodeTLSProbes(v *version.Version, loader *bpf.Loader, nodeModule, libSSLModule *profiling.Module) error {
+ var probeFunc func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
for _, c := range nodeTLSProbeWithVersions {
if v.GreaterOrEquals(c.v) {
probeFunc = c.f
@@ -354,13 +356,13 @@ func registerNodeTLSProbes(v *version.Version, bpf *bpfObjects, linker *Linker,
if probeFunc == nil {
return fmt.Errorf("the version is not support: %v", v)
}
- file := linker.OpenUProbeExeFile(nodeModule.Path)
- probeFunc(file, bpf, nodeModule)
+ file := loader.OpenUProbeExeFile(nodeModule.Path)
+ probeFunc(file, loader, nodeModule)
// find the SSL_new, and register
- file = linker.OpenUProbeExeFile(libSSLModule.Path)
- file.AddLinkWithType("SSL_new", false, bpf.NodeTlsRetSsl)
- return linker.HasError()
+ file = loader.OpenUProbeExeFile(libSSLModule.Path)
+ file.AddLinkWithType("SSL_new", false, loader.NodeTlsRetSsl)
+ return loader.HasError()
}
func getNodeVersion(p string) (*version.Version, error) {
@@ -493,10 +495,10 @@ func assignGoTLSArgsLocation(err error, function *elf.FunctionInfo, argName stri
return fmt.Errorf("the args is not found, function: %s, args name: %s", function.Name(), argName)
}
if args.Location.Type == elf.ArgLocationTypeStack {
- dest.Type = GoTLSArgsLocationTypeStack
+ dest.Type = base.GoTLSArgsLocationTypeStack
dest.Offset = uint32(args.Location.Offset) + kSPOffset
} else if args.Location.Type == elf.ArgLocationTypeRegister {
- dest.Type = GoTLSArgsLocationTypeRegister
+ dest.Type = base.GoTLSArgsLocationTypeRegister
dest.Offset = uint32(args.Location.Offset)
} else {
return fmt.Errorf("the location type is not support, function: %s, args name: %s, type: %d",