You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/02/02 15:39:11 UTC

[skywalking-rover] branch main updated: Optimization and Bug Fix for BPF and Socket Detail Data (#75)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new f7d64cf  Optimization and Bug Fix for BPF and Socket Detail Data (#75)
f7d64cf is described below

commit f7d64cf6eb1aa7aa69ae092fc719774c017e39c2
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Feb 2 23:39:05 2023 +0800

    Optimization and Bug Fix for BPF and Socket Detail Data (#75)
---
 CHANGES.md                                         |  1 +
 bpf/profiling/network/args.h                       |  4 +-
 bpf/profiling/network/sock_stats.h                 |  7 ++-
 bpf/profiling/network/socket_detail.h              |  2 +-
 .../task/network/analyze/base/connection.go        |  3 ++
 pkg/profiling/task/network/analyze/base/events.go  |  4 +-
 .../task/network/analyze/layer4/listener.go        |  2 +-
 .../task/network/analyze/layer7/events.go          | 11 ++++
 .../task/network/analyze/layer7/listener.go        |  1 +
 .../analyze/layer7/protocols/base/analyzer.go      | 59 +++++++++++++++-------
 .../analyze/layer7/protocols/base/buffer.go        | 30 +++++++++--
 .../analyze/layer7/protocols/base/events.go        |  4 +-
 .../layer7/protocols/http1/reader/reader.go        |  2 +-
 .../network/analyze/layer7/protocols/protocols.go  |  6 +++
 14 files changed, 102 insertions(+), 34 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ecdfc29..41d3be3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
 * Enhance the protocol reader for support long socket data.
 * Add the syscall level event to the trace.
 * Support OpenSSL 3.0.x.
+* Optimized the data structure in BPF.
 
 #### Bug Fixes
 * Fix HTTP method name in protocol analyzer
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index 34e0bd2..9d7c90b 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -131,8 +131,8 @@ struct sock_data_args_t {
     unsigned int* msg_len;
     __u64 start_nacs;
     // rtt
-    __u64 rtt_count;
-    __u64 rtt_time;
+    __u32 rtt_count;
+    __u32 rtt_time;
     // buffer
     char* buf;
     struct iovec *iovec;
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index 604071d..54f6775 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -58,14 +58,13 @@ struct active_connection_t {
     __u64 write_rtt_count;
     __u64 write_rtt_time;
 
+    void *last_recv_sk_buff;
     // for protocol analyze
     __u8 protocol;
     // connect event already send
     __u8 connect_event_send;
     // current connection is ssl
     __u8 ssl;
-    __u8 fix;
-    void *last_recv_sk_buff;
 };
 struct {
 	__uint(type, BPF_MAP_TYPE_HASH);
@@ -168,8 +167,8 @@ struct socket_close_event_t {
     __u64 read_exe_time;
 
     // RTT when write
-    __u64 write_rtt_count;
-    __u64 write_rtt_time;
+    __u32 write_rtt_count;
+    __u32 write_rtt_time;
 };
 struct {
 	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
diff --git a/bpf/profiling/network/socket_detail.h b/bpf/profiling/network/socket_detail.h
index 2d24e0f..b4bfa0b 100644
--- a/bpf/profiling/network/socket_detail.h
+++ b/bpf/profiling/network/socket_detail.h
@@ -29,7 +29,7 @@ struct socket_detail_t {
     __u8 func_name;
     __u8 rtt_count;
     __u8 protocol;
-    __u64 rtt_time;
+    __u32 rtt_time;
 };
 
 struct {
diff --git a/pkg/profiling/task/network/analyze/base/connection.go b/pkg/profiling/task/network/analyze/base/connection.go
index d637463..2099fb9 100644
--- a/pkg/profiling/task/network/analyze/base/connection.go
+++ b/pkg/profiling/task/network/analyze/base/connection.go
@@ -84,6 +84,9 @@ type ActiveConnectionInBPF struct {
 	WriteRTTCount   uint64
 	WriteRTTExeTime uint64
 
+	// sk buffer
+	_ uint64
+
 	// protocol of connection
 	Protocol ConnectionProtocol
 	// the connect event is already sent
diff --git a/pkg/profiling/task/network/analyze/base/events.go b/pkg/profiling/task/network/analyze/base/events.go
index 5bdc2aa..ac6a4a1 100644
--- a/pkg/profiling/task/network/analyze/base/events.go
+++ b/pkg/profiling/task/network/analyze/base/events.go
@@ -65,6 +65,6 @@ type SocketCloseEvent struct {
 	ReadCount    uint64
 	ReadExeTime  uint64
 
-	WriteRTTCount   uint64
-	WriteRTTExeTime uint64
+	WriteRTTCount   uint32
+	WriteRTTExeTime uint32
 }
diff --git a/pkg/profiling/task/network/analyze/layer4/listener.go b/pkg/profiling/task/network/analyze/layer4/listener.go
index 162e35c..231ac0b 100644
--- a/pkg/profiling/task/network/analyze/layer4/listener.go
+++ b/pkg/profiling/task/network/analyze/layer4/listener.go
@@ -79,7 +79,7 @@ func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *ba
 	// data transmit counters
 	layer4.WriteCounter.UpdateToCurrent(event.WriteBytes, event.WriteCount, event.WriteExeTime)
 	layer4.ReadCounter.UpdateToCurrent(event.ReadBytes, event.ReadCount, event.ReadExeTime)
-	layer4.WriteRTTCounter.UpdateToCurrent(0, event.WriteRTTCount, event.WriteRTTExeTime)
+	layer4.WriteRTTCounter.UpdateToCurrent(0, uint64(event.WriteRTTCount), uint64(event.WriteRTTExeTime))
 
 	// connection close execute time
 	layer4.CloseExecuteTime = event.ExeTime
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index 68b6520..a5189fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -21,6 +21,7 @@ import (
 	"context"
 
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+	analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
@@ -60,6 +61,16 @@ func (l *Listener) handleProfilingExtensionConfig(config *profiling.ExtensionCon
 	}
 }
 
+func (l *Listener) handleConnectionClose(event *analyzeBase.SocketCloseEvent) {
+	if l.socketDataQueue == nil {
+		return
+	}
+	for _, p := range l.socketDataQueue.partitions {
+		ctx := p.ctx.(*SocketDataPartitionContext)
+		ctx.analyzer.ReceiveSocketClose(event)
+	}
+}
+
 type SocketDataPartitionContext struct {
 	analyzer *protocols.Analyzer
 }
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index 61f32bd..bd0d59a 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -97,6 +97,7 @@ func (l *Listener) ReceiveNewConnection(ctx *base.ConnectionContext, event *base
 func (l *Listener) ReceiveCloseConnection(ctx *base.ConnectionContext, event *base.SocketCloseEvent) {
 	// cached the closed connection with TTL
 	l.cachedConnections.Set(l.generateCachedConnectionKey(ctx.ConnectionID, ctx.RandomID), ctx, ConnectionCachedTTL)
+	l.handleConnectionClose(event)
 }
 
 func (l *Listener) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
index 9449116..8b4755d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -19,9 +19,12 @@ package base
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"time"
 
+	cmap "github.com/orcaman/concurrent-map"
+
 	"github.com/apache/skywalking-rover/pkg/logger"
 	profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
 	"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
@@ -39,7 +42,7 @@ type ProtocolAnalyzer struct {
 	protocol        Protocol
 	config          *profiling.TaskConfig
 
-	connections       map[connectionKey]*connectionInfo
+	connections       cmap.ConcurrentMap // connections with concurrent key: connection id+random id, value: *connectionInfo
 	analyzeLocker     sync.Mutex
 	receiveEventCount int
 }
@@ -49,7 +52,7 @@ func NewProtocolAnalyzer(protocolContext Context, p Protocol, config *profiling.
 		protocolContext: protocolContext,
 		protocol:        p,
 		config:          config,
-		connections:     make(map[connectionKey]*connectionInfo),
+		connections:     cmap.New(),
 	}
 }
 
@@ -116,14 +119,15 @@ func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event *SocketDataUploa
 }
 
 func (a *ProtocolAnalyzer) getConnection(ctx Context, connectionID, randomID uint64) *connectionInfo {
-	key := connectionKey{connectionID: connectionID, randomID: randomID}
-	connection := a.connections[key]
+	conKey := a.generateConnectionInfoKey(connectionID, randomID)
+	connection, _ := a.connections.Get(conKey)
 	if connection == nil {
-		connection = newConnectionInfo(a.protocol, ctx, key.connectionID, key.randomID)
-		a.connections[key] = connection
+		connection = newConnectionInfo(a.protocol, ctx, connectionID, randomID)
+		a.connections.Set(conKey, connection)
 	}
-	connection.checkConnectionMetrics(ctx)
-	return connection
+	info := connection.(*connectionInfo)
+	info.checkConnectionMetrics(ctx)
+	return info
 }
 
 // processEvents means analyze the protocol in each connection
@@ -135,8 +139,19 @@ func (a *ProtocolAnalyzer) processEvents() {
 	}
 	defer a.analyzeLocker.Unlock()
 
-	for _, connection := range a.connections {
-		a.processConnectionEvents(connection)
+	closedConnections := make([]string, 0)
+	a.connections.IterCb(func(conKey string, con interface{}) {
+		info := con.(*connectionInfo)
+		a.processConnectionEvents(info)
+
+		// if the connection already closed and not contains any buffer data, then delete the connection
+		if info.closed && info.buffer.dataEvents.Len() == 0 {
+			closedConnections = append(closedConnections, conKey)
+		}
+	})
+
+	for _, conKey := range closedConnections {
+		a.connections.Remove(conKey)
 	}
 }
 
@@ -146,9 +161,9 @@ func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
 	a.analyzeLocker.Lock()
 	defer a.analyzeLocker.Unlock()
 
-	for _, connection := range a.connections {
-		a.processConnectionExpireEvents(connection, expireDuration)
-	}
+	a.connections.IterCb(func(_ string, con interface{}) {
+		a.processConnectionExpireEvents(con.(*connectionInfo), expireDuration)
+	})
 }
 
 func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
@@ -161,7 +176,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
 	for {
 		// reset the status of reading
 		if !buffer.prepareForReading() {
-			log.Debugf("prepare finsihed: event size: %d", buffer.dataEvents.Len())
+			log.Debugf("prepare finsihed: reduce data event size: %d", buffer.dataEvents.Len())
 			return
 		}
 
@@ -175,7 +190,7 @@ func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
 		}
 
 		if finishReading {
-			log.Debugf("reading finsihed: event size: %d", buffer.dataEvents.Len())
+			log.Debugf("reading finsihed: reduce data event size: %d", buffer.dataEvents.Len())
 			break
 		}
 	}
@@ -191,9 +206,16 @@ func (a *ProtocolAnalyzer) UpdateExtensionConfig(config *profiling.ExtensionConf
 	a.protocol.UpdateExtensionConfig(config)
 }
 
-type connectionKey struct {
-	connectionID uint64
-	randomID     uint64
+func (a *ProtocolAnalyzer) ReceiveSocketCloseEvent(event *base.SocketCloseEvent) {
+	con, _ := a.connections.Get(a.generateConnectionInfoKey(event.ConID, event.RandomID))
+	if con == nil {
+		return
+	}
+	con.(*connectionInfo).closed = true
+}
+
+func (a *ProtocolAnalyzer) generateConnectionInfoKey(connectionID, randomID uint64) string {
+	return fmt.Sprintf("%d_%d", connectionID, randomID)
 }
 
 type connectionInfo struct {
@@ -202,6 +224,7 @@ type connectionInfo struct {
 	buffer                 *Buffer
 	metrics                Metrics
 	metricsFromConnection  bool
+	closed                 bool
 }
 
 func newConnectionInfo(p Protocol, connectionContext Context, connectionID, randomID uint64) *connectionInfo {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
index bbcac23..ec374fc 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -41,6 +41,10 @@ type Buffer struct {
 
 	head    *BufferPosition
 	current *BufferPosition
+
+	// record the latest expired data id in connection for expire the older socket detail
+	// because the older socket detail may not be received in buffer
+	latestExpiredDataID uint64
 }
 
 type BufferPosition struct {
@@ -489,14 +493,32 @@ func (r *Buffer) deleteExpireEvents(expireDuration time.Duration) int {
 	defer r.eventLocker.Unlock()
 
 	expireTime := time.Now().Add(-expireDuration)
-	count := 0
-	for e := r.dataEvents.Front(); e != nil; {
-		startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+	// data event queue
+	count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool {
+		buffer := element.Value.(SocketDataBuffer)
+		startTime := host.Time(buffer.StartTime())
 		if expireTime.After(startTime) {
+			r.latestExpiredDataID = buffer.DataID()
+			return true
+		}
+		return false
+	})
+
+	// detail event queue
+	count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool {
+		return r.latestExpiredDataID > 0 && element.Value.(*SocketDetailEvent).DataID <= r.latestExpiredDataID
+	})
+	return count
+}
+
+func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int {
+	count := 0
+	for e := l.Front(); e != nil; {
+		if checker(e) {
 			count++
 			cur := e
 			e = e.Next()
-			r.dataEvents.Remove(cur)
+			l.Remove(cur)
 		} else {
 			break
 		}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index debf48c..857543c 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,6 +24,8 @@ import (
 )
 
 type SocketDataBuffer interface {
+	// GenerateConnectionID for identity the buffer belong which connection
+	GenerateConnectionID() string
 	// BufferData of the buffer
 	BufferData() []byte
 	// TotalSize of socket data, the data may exceed the size of the BufferData()
@@ -147,7 +149,7 @@ type SocketDetailEvent struct {
 	FuncName         base.SocketFunctionName
 	RTTCount         uint8
 	Protocol         base.ConnectionProtocol
-	RTTTime          uint64
+	RTTTime          uint32
 }
 
 func (s *SocketDetailEvent) GenerateConnectionID() string {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
index ace2a13..4197e72 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -99,7 +99,7 @@ func (m *MessageOpt) StartTime() uint64 {
 }
 
 func (m *MessageOpt) EndTime() uint64 {
-	return m.HeaderBuffer().LastSocketBuffer().EndTime()
+	return m.BodyBuffer().LastSocketBuffer().EndTime()
 }
 
 func (m *MessageOpt) Direction() base.SocketDataDirection {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index ad1859c..480a3d8 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -94,6 +94,12 @@ func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
 	}
 }
 
+func (a *Analyzer) ReceiveSocketClose(event *base.SocketCloseEvent) {
+	for _, p := range a.protocols {
+		p.ReceiveSocketCloseEvent(event)
+	}
+}
+
 type ProtocolMetrics struct {
 	data map[base.ConnectionProtocol]protocol.Metrics
 }