You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/02/02 15:39:11 UTC
[skywalking-rover] branch main updated: Optimization and Bug Fix for BPF and Socket Detail Data (#75)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new f7d64cf Optimization and Bug Fix for BPF and Socket Detail Data (#75)
f7d64cf is described below
commit f7d64cf6eb1aa7aa69ae092fc719774c017e39c2
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Feb 2 23:39:05 2023 +0800
Optimization and Bug Fix for BPF and Socket Detail Data (#75)
---
CHANGES.md | 1 +
bpf/profiling/network/args.h | 4 +-
bpf/profiling/network/sock_stats.h | 7 ++-
bpf/profiling/network/socket_detail.h | 2 +-
.../task/network/analyze/base/connection.go | 3 ++
pkg/profiling/task/network/analyze/base/events.go | 4 +-
.../task/network/analyze/layer4/listener.go | 2 +-
.../task/network/analyze/layer7/events.go | 11 ++++
.../task/network/analyze/layer7/listener.go | 1 +
.../analyze/layer7/protocols/base/analyzer.go | 59 +++++++++++++++-------
.../analyze/layer7/protocols/base/buffer.go | 30 +++++++++--
.../analyze/layer7/protocols/base/events.go | 4 +-
.../layer7/protocols/http1/reader/reader.go | 2 +-
.../network/analyze/layer7/protocols/protocols.go | 6 +++
14 files changed, 102 insertions(+), 34 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ecdfc29..41d3be3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
* Enhance the protocol reader for support long socket data.
* Add the syscall level event to the trace.
* Support OpenSSL 3.0.x.
+* Optimized the data structure in BPF.
#### Bug Fixes
* Fix HTTP method name in protocol analyzer
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index 34e0bd2..9d7c90b 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -131,8 +131,8 @@ struct sock_data_args_t {
unsigned int* msg_len;
__u64 start_nacs;
// rtt
- __u64 rtt_count;
- __u64 rtt_time;
+ __u32 rtt_count;
+ __u32 rtt_time;
// buffer
char* buf;
struct iovec *iovec;
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index 604071d..54f6775 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -58,14 +58,13 @@ struct active_connection_t {
__u64 write_rtt_count;
__u64 write_rtt_time;
+ void *last_recv_sk_buff;
// for protocol analyze
__u8 protocol;
// connect event already send
__u8 connect_event_send;
// current connection is ssl
__u8 ssl;
- __u8 fix;
- void *last_recv_sk_buff;
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
@@ -168,8 +167,8 @@ struct socket_close_event_t {
__u64 read_exe_time;
// RTT when write
- __u64 write_rtt_count;
- __u64 write_rtt_time;
+ __u32 write_rtt_count;
+ __u32 write_rtt_time;
};
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
diff --git a/bpf/profiling/network/socket_detail.h b/bpf/profiling/network/socket_detail.h
index 2d24e0f..b4bfa0b 100644
--- a/bpf/profiling/network/socket_detail.h
+++ b/bpf/profiling/network/socket_detail.h
@@ -29,7 +29,7 @@ struct socket_detail_t {
__u8 func_name;
__u8 rtt_count;
__u8 protocol;
- __u64 rtt_time;
+ __u32 rtt_time;
};
struct {
diff --git a/pkg/profiling/task/network/analyze/base/connection.go b/pkg/profiling/task/network/analyze/base/connection.go
index d637463..2099fb9 100644
--- a/pkg/profiling/task/network/analyze/base/connection.go
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -84,6 +84,9 @@ type ActiveConnectionInBPF struct {
WriteRTTCount uint64
WriteRTTExeTime uint64
+ // sk buffer
+ _ uint64
+
// protocol of connection
Protocol ConnectionProtocol
// the connect event is already sent
diff --git a/pkg/profiling/task/network/analyze/base/events.go b/pkg/profiling/task/network/analyze/base/events.go
index 5bdc2aa..ac6a4a1 100644
--- a/pkg/profiling/task/network/analyze/base/events.go
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -65,6 +65,6 @@ type SocketCloseEvent struct {
ReadCount uint64
ReadExeTime uint64
- WriteRTTCount uint64
- WriteRTTExeTime uint64
+ WriteRTTCount uint32
+ WriteRTTExeTime uint32
}
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go b/pkg/profiling/task/network/analyze/layer4/listener.go
index 162e35c..231ac0b 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -79,7 +79,7 @@ func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *ba
// data transmit counters
layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, event.WriteExeTime)
layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, event.ReadExeTime)
- layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, event.WriteRTTExeTime)
+ layer4.WriteRTTCounter.UpdateToCurrent(0, uint64(event.WriteRTTCount), uint64(event.WriteRTTExeTime))
// connection close execute time
layer4.CloseExecuteTime = event.ExeTime
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index 68b6520..a5189fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -21,6 +21,7 @@ import (
"context"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+ analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
@@ -60,6 +61,16 @@ func (l *Listener) handleProfilingExtensionConfig(config *profiling.ExtensionCon
}
}
+func (l *Listener) handleConnectionClose(event *analyzeBase.SocketCloseEvent) {
+ if l.socketDataQueue == nil {
+ return
+ }
+ for _, p := range l.socketDataQueue.partitions {
+ ctx := p.ctx.(*SocketDataPartitionContext)
+ ctx.analyzer.ReceiveSocketClose(event)
+ }
+}
+
type SocketDataPartitionContext struct {
analyzer *protocols.Analyzer
}
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index 61f32bd..bd0d59a 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -97,6 +97,7 @@ func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event *base
func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *base.SocketCloseEvent) {
// cached the closed connection with TTL
l.cachedConnections.Set(l.generateCachedConnectionKey(ctx.ConnectionID, ctx.RandomID), ctx, ConnectionCachedTTL)
+ l.handleConnectionClose(event)
}
func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 9449116..8b4755d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -19,9 +19,12 @@ package base
import (
"context"
+ "fmt"
"sync"
"time"
+ cmap "github.com/orcaman/concurrent-map"
+
"github.com/apache/skywalking-rover/pkg/logger"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
@@ -39,7 +42,7 @@ type ProtocolAnalyzer struct {
protocol Protocol
config *profiling.TaskConfig
- connections map[connectionKey]*connectionInfo
+ connections cmap.ConcurrentMap // connections with concurrent key: connection id+random id, value: *connectionInfo
analyzeLocker sync.Mutex
receiveEventCount int
}
@@ -49,7 +52,7 @@ func NewProtocolAnalyzer(protocolContext Context, p Protocol, config *profiling.
protocolContext: protocolContext,
protocol: p,
config: config,
- connections: make(map[connectionKey]*connectionInfo),
+ connections: cmap.New(),
}
}
@@ -116,14 +119,15 @@ func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event *SocketDataUploa
}
func (a *ProtocolAnalyzer) getConnection(ctx Context, connectionID, randomID uint64) *connectionInfo {
- key := connectionKey{connectionID: connectionID, randomID: randomID}
- connection := a.connections[key]
+ conKey := a.generateConnectionInfoKey(connectionID, randomID)
+ connection, _ := a.connections.Get(conKey)
if connection == nil {
- connection = newConnectionInfo(a.protocol, ctx, key.connectionID, key.randomID)
- a.connections[key] = connection
+ connection = newConnectionInfo(a.protocol, ctx, connectionID, randomID)
+ a.connections.Set(conKey, connection)
}
- connection.checkConnectionMetrics(ctx)
- return connection
+ info := connection.(*connectionInfo)
+ info.checkConnectionMetrics(ctx)
+ return info
}
// processEvents means analyze the protocol in each connection
@@ -135,8 +139,19 @@ func (a *ProtocolAnalyzer) processEvents() {
}
defer a.analyzeLocker.Unlock()
- for _, connection := range a.connections {
- a.processConnectionEvents(connection)
+ closedConnections := make([]string, 0)
+ a.connections.IterCb(func(conKey string, con interface{}) {
+ info := con.(*connectionInfo)
+ a.processConnectionEvents(info)
+
+ // if the connection already closed and not contains any buffer data, then delete the connection
+ if info.closed && info.buffer.dataEvents.Len() == 0 {
+ closedConnections = append(closedConnections, conKey)
+ }
+ })
+
+ for _, conKey := range closedConnections {
+ a.connections.Remove(conKey)
}
}
@@ -146,9 +161,9 @@ func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
a.analyzeLocker.Lock()
defer a.analyzeLocker.Unlock()
- for _, connection := range a.connections {
- a.processConnectionExpireEvents(connection, expireDuration)
- }
+ a.connections.IterCb(func(_ string, con interface{}) {
+ a.processConnectionExpireEvents(con.(*connectionInfo), expireDuration)
+ })
}
func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
@@ -161,7 +176,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
for {
// reset the status of reading
if !buffer.prepareForReading() {
- log.Debugf("prepare finsihed: event size: %d", buffer.dataEvents.Len())
+ log.Debugf("prepare finsihed: reduce data event size: %d", buffer.dataEvents.Len())
return
}
@@ -175,7 +190,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
}
if finishReading {
- log.Debugf("reading finsihed: event size: %d", buffer.dataEvents.Len())
+ log.Debugf("reading finsihed: reduce data event size: %d", buffer.dataEvents.Len())
break
}
}
@@ -191,9 +206,16 @@ func (a *ProtocolAnalyzer) UpdateExtensionConfig(config *profiling.ExtensionConf
a.protocol.UpdateExtensionConfig(config)
}
-type connectionKey struct {
- connectionID uint64
- randomID uint64
+func (a *ProtocolAnalyzer) ReceiveSocketCloseEvent(event *base.SocketCloseEvent) {
+ con, _ := a.connections.Get(a.generateConnectionInfoKey(event.ConID, event.RandomID))
+ if con == nil {
+ return
+ }
+ con.(*connectionInfo).closed = true
+}
+
+func (a *ProtocolAnalyzer) generateConnectionInfoKey(connectionID, randomID uint64) string {
+ return fmt.Sprintf("%d_%d", connectionID, randomID)
}
type connectionInfo struct {
@@ -202,6 +224,7 @@ type connectionInfo struct {
buffer *Buffer
metrics Metrics
metricsFromConnection bool
+ closed bool
}
func newConnectionInfo(p Protocol, connectionContext Context, connectionID, randomID uint64) *connectionInfo {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
index bbcac23..ec374fc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -41,6 +41,10 @@ type Buffer struct {
head *BufferPosition
current *BufferPosition
+
+ // record the latest expired data id in connection for expire the older socket detail
+ // because the older socket detail may not be received in buffer
+ latestExpiredDataID uint64
}
type BufferPosition struct {
@@ -489,14 +493,32 @@ func (r *Buffer) deleteExpireEvents(expireDuration time.Duration) int {
defer r.eventLocker.Unlock()
expireTime := time.Now().Add(-expireDuration)
- count := 0
- for e := r.dataEvents.Front(); e != nil; {
- startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+ // data event queue
+ count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool {
+ buffer := element.Value.(SocketDataBuffer)
+ startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
+ r.latestExpiredDataID = buffer.DataID()
+ return true
+ }
+ return false
+ })
+
+ // detail event queue
+ count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool {
+ return r.latestExpiredDataID > 0 && element.Value.(*SocketDetailEvent).DataID <= r.latestExpiredDataID
+ })
+ return count
+}
+
+func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int {
+ count := 0
+ for e := l.Front(); e != nil; {
+ if checker(e) {
count++
cur := e
e = e.Next()
- r.dataEvents.Remove(cur)
+ l.Remove(cur)
} else {
break
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index debf48c..857543c 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,6 +24,8 @@ import (
)
type SocketDataBuffer interface {
+ // GenerateConnectionID for identity the buffer belong which connection
+ GenerateConnectionID() string
// BufferData of the buffer
BufferData() []byte
// TotalSize of socket data, the data may exceed the size of the BufferData()
@@ -147,7 +149,7 @@ type SocketDetailEvent struct {
FuncName base.SocketFunctionName
RTTCount uint8
Protocol base.ConnectionProtocol
- RTTTime uint64
+ RTTTime uint32
}
func (s *SocketDetailEvent) GenerateConnectionID() string {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index ace2a13..4197e72 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -99,7 +99,7 @@ func (m *MessageOpt) StartTime() uint64 {
}
func (m *MessageOpt) EndTime() uint64 {
- return m.HeaderBuffer().LastSocketBuffer().EndTime()
+ return m.BodyBuffer().LastSocketBuffer().EndTime()
}
func (m *MessageOpt) Direction() base.SocketDataDirection {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index ad1859c..480a3d8 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -94,6 +94,12 @@ func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
}
}
+func (a *Analyzer) ReceiveSocketClose(event *base.SocketCloseEvent) {
+ for _, p := range a.protocols {
+ p.ReceiveSocketCloseEvent(event)
+ }
+}
+
type ProtocolMetrics struct {
data map[base.ConnectionProtocol]protocol.Metrics
}