You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2022/08/02 12:20:34 UTC
[skywalking-rover] branch main updated: Support OpenSSL library and add protocol and IsSSL labels in metrics (#43)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 87f2337 Support OpenSSL library and add protocol and IsSSL labels in metrics (#43)
87f2337 is described below
commit 87f23377feb901ba6b7cd8b476c448aa4e5d1ed0
Author: mrproliu <74...@qq.com>
AuthorDate: Tue Aug 2 20:20:30 2022 +0800
Support OpenSSL library and add protocol and IsSSL labels in metrics (#43)
---
CHANGES.md | 1 +
bpf/profiling/network/args.h | 4 +-
bpf/profiling/network/netmonitor.c | 46 ++++++----
bpf/profiling/network/openssl.c | 116 ++++++++++++++++++++++++
bpf/profiling/network/openssl.h | 43 +++++++++
bpf/profiling/network/sock_stats.h | 6 ++
pkg/profiling/task/network/analyzer.go | 6 ++
pkg/profiling/task/network/context.go | 40 +++++++--
pkg/profiling/task/network/enums.go | 55 +++++++++++-
pkg/profiling/task/network/linker.go | 60 +++++++++++++
pkg/profiling/task/network/metrics.go | 8 ++
pkg/profiling/task/network/runner.go | 8 +-
pkg/profiling/task/network/ssl.go | 155 +++++++++++++++++++++++++++++++++
pkg/tools/process.go | 16 ++++
pkg/tools/profiling/go_library.go | 9 +-
15 files changed, 537 insertions(+), 36 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1cd7a8a..8c74a6b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@ Release Notes.
#### Features
* Support `NETWORK` Profiling.
* Let the logger as a configurable module.
+* Support analyze the data of OpenSSL library in `NETWORK` Profiling.
#### Bug Fixes
* Fixed reading process paths incorrect when running as a container.
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index ce65720..b9bcc59 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -41,7 +41,9 @@
#define SOCKET_OPTS_TYPE_RECVFROM 14
#define SOCKET_OPTS_TYPE_RECVMSG 15
#define SOCKET_OPTS_TYPE_RECVMMSG 16
-#define SOCKET_OPTS_TYPE_RESENT 17
+#define SOCKET_OPTS_TYPE_RESENT 17
+#define SOCKET_OPTS_TYPE_SSL_WRITE 18
+#define SOCKET_OPTS_TYPE_SSL_READ 19
// tracepoint enter
struct trace_event_raw_sys_enter {
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 7296606..b7262fc 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -192,8 +192,9 @@ static __inline void notify_close_connection(struct pt_regs* ctx, __u64 conid, s
close_event.random_id = con->random_id;
close_event.exe_time = exe_time;
close_event.pid = con->pid;
- close_event.sockfd = con->sockfd;
close_event.role = con->role;
+ close_event.protocol = con->protocol;
+ close_event.ssl = con->ssl;
close_event.socket_family = con->socket_family;
close_event.local_addr_v4 = con->local_addr_v4;
@@ -236,6 +237,14 @@ static __inline struct active_connection_t* get_or_create_active_conn(struct pt_
return bpf_map_lookup_elem(&active_connection_map, &conid);
}
+static __inline void set_conn_as_ssl(struct pt_regs* ctx, __u32 tgid, __u32 fd, __u32 func_name) {
+ struct active_connection_t* conn = get_or_create_active_conn(ctx, tgid, fd, func_name);
+ if (conn == NULL) {
+ return;
+ }
+ conn->ssl = true;
+};
+
static __inline void submit_connection_when_not_exists(struct pt_regs *ctx, __u64 id, const struct connect_args_t* connect_args, __u32 func_name) {
__u32 tgid = (__u32)(id >> 32);
__u32 fd = connect_args->fd;
@@ -274,7 +283,7 @@ static __always_inline void resent_connect_event(struct pt_regs *ctx, __u32 tgid
}
static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count,
- __u32 data_direction, const bool vecs, __u32 func_name) {
+ __u32 data_direction, const bool vecs, __u32 func_name, bool ssl) {
__u64 curr_nacs = bpf_ktime_get_ns();
__u32 tgid = (__u32)(id >> 32);
@@ -309,8 +318,9 @@ static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, st
resent_connect_event(ctx, tgid, args->fd, conid, conn);
}
- // unknown connection role, then try to use procotol analyzer to analyze request or response
- if (conn->role == CONNECTION_ROLE_TYPE_UNKNOWN) {
+ // if the protocol or role is unknown in the connection and the current data content is plaintext
+ // then try to use protocol analyzer to analyze request or response and protocol type
+ if ((conn->role == CONNECTION_ROLE_TYPE_UNKNOWN || conn->protocol == 0) && conn->ssl == ssl) {
struct socket_buffer_reader_t *buf_reader = NULL;
if (args->buf != NULL) {
buf_reader = read_socket_data(args->buf, bytes_count);
@@ -506,7 +516,7 @@ int sys_sendto_ret(struct pt_regs *ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SENDTO);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SENDTO, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -551,7 +561,7 @@ int sys_write_ret(struct pt_regs *ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args && data_args->is_sock_event) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_WRITE);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_WRITE, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -576,7 +586,7 @@ int sys_send_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SEND);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, false, SOCKET_OPTS_TYPE_SEND, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -605,7 +615,7 @@ int sys_writev_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args && data_args->is_sock_event) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_WRITEV);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_WRITEV, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -655,7 +665,7 @@ int sys_sendmsg_ret(struct pt_regs* ctx) {
// socket data
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMSG);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -711,7 +721,7 @@ int sys_sendmmsg_ret(struct pt_regs* ctx) {
if (data_args) {
__u32 bytes_count;
BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -796,7 +806,7 @@ int sys_read_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args && data_args->is_sock_event) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_READ);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_READ, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -824,7 +834,7 @@ int sys_readv_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args && data_args->is_sock_event) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_READV);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_READV, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -849,7 +859,7 @@ int sys_recv_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
ssize_t bytes_count = PT_REGS_RC(ctx);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECV);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECV, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -895,7 +905,7 @@ int sys_recvfrom_ret(struct pt_regs* ctx) {
// socket data
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECVFROM);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, false, SOCKET_OPTS_TYPE_RECVFROM, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -944,7 +954,7 @@ int sys_recvmsg_ret(struct pt_regs* ctx) {
// socket data
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMSG);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -1000,7 +1010,7 @@ int sys_recvmmsg_ret(struct pt_regs* ctx) {
if (data_args) {
__u32 bytes_count;
BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
- process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG);
+ process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
return 0;
@@ -1109,4 +1119,6 @@ int tcp_drop(struct pt_regs *ctx) {
struct sock *s = (void *)PT_REGS_PARM1(ctx);
send_socket_exception_operation_event(ctx, SOCKET_EXCEPTION_OPERATION_TYPE_DROP, s);
return 0;
-}
\ No newline at end of file
+}
+
+#include "openssl.c"
\ No newline at end of file
diff --git a/bpf/profiling/network/openssl.c b/bpf/profiling/network/openssl.c
new file mode 100644
index 0000000..3bd81ed
--- /dev/null
+++ b/bpf/profiling/network/openssl.c
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "openssl.h"
+
+static __inline void process_openssl_data(struct pt_regs* ctx, __u64 id, __u32 data_direction, struct sock_data_args_t* args, __u32 func_name) {
+ int bytes_count = PT_REGS_RC(ctx);
+ process_write_data(ctx, id, args, bytes_count, data_direction, false, func_name, true);
+}
+
+static int get_fd_symaddr(__u32 tgid, bool read, void* ssl) {
+ struct openssl_fd_symaddr* addr = get_openssl_fd_symaddr(tgid);
+ if (addr == NULL) {
+ return -1;
+ }
+ __u32 bio_offset = read ? addr->bio_read_offset : addr->bio_write_offset;
+ __u32 fd_offset = addr->fd_offset;
+
+ void *bio = NULL;
+ bpf_probe_read(&bio, sizeof(bio), ssl + bio_offset);
+ __u32 fd;
+ bpf_probe_read(&fd, sizeof(fd), bio + fd_offset);
+
+ return fd;
+}
+
+static int get_fd(uint32_t tgid, bool read, void* ssl) {
+ int fd = -1;
+
+ fd = get_fd_symaddr(tgid, read, ssl);
+ if (fd > 2) {
+ return fd;
+ }
+
+ return -1;
+}
+
+SEC("uprobe/ssl_write")
+int openssl_write(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+
+ void* ssl = (void*)PT_REGS_PARM1(ctx);
+ __u32 fd = get_fd(tgid, false, ssl);
+ bpf_printk("ssl_write fd: %d\n", fd);
+ if (fd < 0) {
+ return 0;
+ }
+
+ char* buf = (char*)PT_REGS_PARM2(ctx);
+ struct sock_data_args_t data_args = {};
+ data_args.fd = fd;
+ data_args.buf = buf;
+ bpf_map_update_elem(&openssl_sock_data_args, &id, &data_args, 0);
+
+ set_conn_as_ssl(ctx, tgid, fd, SOCKET_OPTS_TYPE_SSL_WRITE);
+ return 0;
+}
+
+SEC("uretprobe/ssl_write")
+int openssl_write_ret(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock_data_args_t *args = bpf_map_lookup_elem(&openssl_sock_data_args, &id);
+ if (args) {
+ process_openssl_data(ctx, id, SOCK_DATA_DIRECTION_EGRESS, args, SOCKET_OPTS_TYPE_SSL_WRITE);
+ }
+ bpf_map_delete_elem(&openssl_sock_data_args, &id);
+ return 0;
+}
+
+SEC("uprobe/ssl_read")
+int openssl_read(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ __u32 tgid = id >> 32;
+
+ void* ssl = (void*)PT_REGS_PARM1(ctx);
+ __u32 fd = get_fd(tgid, true, ssl);
+ bpf_printk("ssl_read fd: %d\n", fd);
+ if (fd < 0) {
+ return 0;
+ }
+
+ char* buf = (char*)PT_REGS_PARM2(ctx);
+ struct sock_data_args_t data_args = {};
+ data_args.fd = fd;
+ data_args.buf = buf;
+ bpf_map_update_elem(&openssl_sock_data_args, &id, &data_args, 0);
+
+ set_conn_as_ssl(ctx, tgid, fd, SOCKET_OPTS_TYPE_SSL_WRITE);
+ return 0;
+}
+
+SEC("uretprobe/ssl_read")
+int openssl_read_ret(struct pt_regs* ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock_data_args_t *args = bpf_map_lookup_elem(&openssl_sock_data_args, &id);
+ if (args) {
+ process_openssl_data(ctx, id, SOCK_DATA_DIRECTION_INGRESS, args, SOCKET_OPTS_TYPE_SSL_READ);
+ }
+ bpf_map_delete_elem(&openssl_sock_data_args, &id);
+ return 0;
+}
\ No newline at end of file
diff --git a/bpf/profiling/network/openssl.h b/bpf/profiling/network/openssl.h
new file mode 100644
index 0000000..11536be
--- /dev/null
+++ b/bpf/profiling/network/openssl.h
@@ -0,0 +1,43 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+struct openssl_fd_symaddr {
+ // read the BIO offset from ssl
+ __u32 bio_read_offset;
+ __u32 bio_write_offset;
+ // read the fd offset from BIO
+ __u32 fd_offset;
+};
+
+struct {
+ __uint(type, BPF_MAP_TYPE_HASH);
+ __uint(max_entries, 10000);
+ __type(key, __u32);
+ __type(value, struct openssl_fd_symaddr);
+} openssl_fd_symaddr_finder SEC(".maps");
+static __inline struct openssl_fd_symaddr* get_openssl_fd_symaddr(__u32 tgid) {
+ struct openssl_fd_symaddr *addr = bpf_map_lookup_elem(&openssl_fd_symaddr_finder, &tgid);
+ return addr;
+}
+
+// openssl read or write
+struct {
+ __uint(type, BPF_MAP_TYPE_HASH);
+ __uint(max_entries, 10000);
+ __type(key, __u64);
+ __type(value, struct sock_data_args_t);
+} openssl_sock_data_args SEC(".maps");
\ No newline at end of file
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index a597116..018bc69 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -59,6 +59,8 @@ struct active_connection_t {
__u64 prev_count;
char prev_buf[4];
__u32 prepend_length_header;
+ // current connection is ssl
+ __u32 ssl;
// connect event already send
__u32 connect_event_send;
@@ -136,6 +138,10 @@ struct socket_close_event_t {
__u32 sockfd;
// the type of role in current connection
__u32 role;
+ // the protocol type of the connection
+ __u32 protocol;
+ // the connection is ssl
+ __u32 ssl;
__u32 fix;
// socket type
diff --git a/pkg/profiling/task/network/analyzer.go b/pkg/profiling/task/network/analyzer.go
index e92a574..581da98 100644
--- a/pkg/profiling/task/network/analyzer.go
+++ b/pkg/profiling/task/network/analyzer.go
@@ -192,6 +192,12 @@ func (t *TrafficAnalyzer) generateOrCombineTraffic(traffic *ProcessTraffic, con
if traffic.ConnectionRole == ConnectionRoleUnknown && con.Role != ConnectionRoleUnknown {
traffic.ConnectionRole = con.Role
}
+ if traffic.Protocol == ConnectionProtocolUnknown && con.Protocol != ConnectionProtocolUnknown {
+ traffic.Protocol = con.Protocol
+ }
+ if !traffic.IsSSL && con.IsSSL {
+ traffic.IsSSL = true
+ }
if remotePid != 0 {
traffic.RemotePid = remotePid
diff --git a/pkg/profiling/task/network/context.go b/pkg/profiling/task/network/context.go
index 451496f..cba0ce2 100644
--- a/pkg/profiling/task/network/context.go
+++ b/pkg/profiling/task/network/context.go
@@ -42,7 +42,8 @@ import (
type Context struct {
processes map[int32][]api.ProcessInterface
- bpf *bpfObjects // current bpf programs
+ bpf *bpfObjects // current bpf programs
+ linker *Linker
// standard syscall connections
activeConnections cmap.ConcurrentMap // current activeConnections connections
@@ -79,6 +80,8 @@ type ConnectionContext struct {
SocketFD uint32
LocalProcesses []api.ProcessInterface
ConnectionClosed bool
+ Protocol ConnectionProtocol
+ IsSSL bool
// socket metadata
Role ConnectionRole
@@ -124,21 +127,22 @@ func NewContext() *Context {
}
}
-func (c *Context) Init(bpf *bpfObjects) {
+func (c *Context) Init(bpf *bpfObjects, linker *Linker) {
c.bpf = bpf
+ c.linker = linker
}
-func (c *Context) RegisterAllHandlers(linker *Linker) {
+func (c *Context) RegisterAllHandlers() {
// socket connect
- linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
+ c.linker.ReadEventAsync(c.bpf.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} {
return &SocketConnectEvent{}
})
// socket close
- linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
+ c.linker.ReadEventAsync(c.bpf.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} {
return &SocketCloseEvent{}
})
// socket retransmit
- linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, c.handleSocketExceptionOperationEvent, func() interface{} {
+ c.linker.ReadEventAsync(c.bpf.SocketExceptionOperationEventQueue, c.handleSocketExceptionOperationEvent, func() interface{} {
return &SocketExceptionOperationEvent{}
})
}
@@ -277,10 +281,11 @@ type ActiveConnectionInBPF struct {
WriteRTTExeTime uint64
// Protocol analyze context
- Protocol uint32
+ Protocol ConnectionProtocol
ProtocolPrevCount uint64
ProtocolPrevBuf [4]byte
ProtocolPrependHeader uint32
+ IsSSL uint32
// the connect event is already sent
ConnectEventIsSent uint32
@@ -323,6 +328,12 @@ func (c *Context) fillConnectionMetrics(ccs []*ConnectionContext) {
if cc.Role == ConnectionRoleUnknown && activeConnection.Role != ConnectionRoleUnknown {
cc.Role = activeConnection.Role
}
+ if cc.Protocol == ConnectionProtocolUnknown && activeConnection.Protocol != ConnectionProtocolUnknown {
+ cc.Protocol = activeConnection.Protocol
+ }
+ if !cc.IsSSL && activeConnection.IsSSL == 1 {
+ cc.IsSSL = true
+ }
// update the role
cc.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
@@ -453,6 +464,8 @@ type SocketCloseEvent struct {
Pid uint32
SocketFD uint32
Role ConnectionRole
+ Protocol ConnectionProtocol
+ IsSSL uint32
Fix uint32
SocketFamily uint32
@@ -614,6 +627,12 @@ func (c *Context) combineClosedConnection(active *ConnectionContext, closed *Soc
if active.Role == ConnectionRoleUnknown && closed.Role != ConnectionRoleUnknown {
active.Role = closed.Role
}
+ if active.Protocol == ConnectionProtocolUnknown && closed.Protocol != ConnectionProtocolUnknown {
+ active.Protocol = closed.Protocol
+ }
+ if !active.IsSSL && closed.IsSSL == 1 {
+ active.IsSSL = true
+ }
active.WriteCounter.UpdateToCurrent(closed.WriteBytes, closed.WriteCount, closed.WriteExeTime)
active.ReadCounter.UpdateToCurrent(closed.ReadBytes, closed.ReadCount, closed.ReadExeTime)
@@ -646,9 +665,16 @@ func (c *Context) AddProcesses(processes []api.ProcessInterface) error {
c.processes[pid] = append(c.processes[pid], p)
+ // add to the process let it could be monitored
if err1 := c.bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); err1 != nil {
err = multierror.Append(err, err1)
}
+
+ // add process ssl config
+ if err1 := addSSLProcess(int(pid), c.bpf, c.linker); err1 != nil {
+ err = multierror.Append(err, err1)
+ }
+
log.Debugf("add monitor process, pid: %d", pid)
}
return err
diff --git a/pkg/profiling/task/network/enums.go b/pkg/profiling/task/network/enums.go
index 6f20e0a..2f5981f 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/enums.go
@@ -17,6 +17,11 @@
package network
+const (
+ unknown = "unknown"
+ http = "http"
+)
+
// ConnectionRole represents the role of the current process is the connection
// whether it's a server or a client, if it's not trigger the connection/accept request, then it's unknown
type ConnectionRole uint32
@@ -34,7 +39,7 @@ func (r ConnectionRole) String() string {
case ConnectionRoleServer:
return "server"
default:
- return "unknown"
+ return unknown
}
}
@@ -71,3 +76,51 @@ const (
SocketExceptionOperationRetransmit SocketExceptionOperationType = 1
SocketExceptionOperationDrop SocketExceptionOperationType = 2
)
+
+type ConnectionProtocol uint32
+
+const (
+ ConnectionProtocolUnknown ConnectionProtocol = 0
+ ConnectionProtocolHTTP ConnectionProtocol = 1
+ ConnectionProtocolHTTP2 ConnectionProtocol = 2
+ ConnectionProtocolMySQL ConnectionProtocol = 3
+ ConnectionProtocolCQL ConnectionProtocol = 4
+ ConnectionProtocolPGSQL ConnectionProtocol = 5
+ ConnectionProtocolDNS ConnectionProtocol = 6
+ ConnectionProtocolRedis ConnectionProtocol = 7
+ ConnectionProtocolNATS ConnectionProtocol = 8
+ ConnectionProtocolMongo ConnectionProtocol = 9
+ ConnectionProtocolKafka ConnectionProtocol = 10
+ ConnectionProtocolMux ConnectionProtocol = 11
+)
+
+func (c ConnectionProtocol) String() string {
+ switch c {
+ case ConnectionProtocolUnknown:
+ return unknown
+ case ConnectionProtocolHTTP:
+ return http
+ case ConnectionProtocolHTTP2:
+ return http
+ case ConnectionProtocolMySQL:
+ return "mysql"
+ case ConnectionProtocolCQL:
+ return "cql"
+ case ConnectionProtocolPGSQL:
+ return "pgsql"
+ case ConnectionProtocolDNS:
+ return "dns"
+ case ConnectionProtocolRedis:
+ return "redis"
+ case ConnectionProtocolNATS:
+ return "nats"
+ case ConnectionProtocolMongo:
+ return "mongo"
+ case ConnectionProtocolKafka:
+ return "kafka"
+ case ConnectionProtocolMux:
+ return "mutex"
+ default:
+ return unknown
+ }
+}
diff --git a/pkg/profiling/task/network/linker.go b/pkg/profiling/task/network/linker.go
index 859930a..46015bb 100644
--- a/pkg/profiling/task/network/linker.go
+++ b/pkg/profiling/task/network/linker.go
@@ -77,18 +77,28 @@ type Linker struct {
closeOnce sync.Once
}
+type UProbeExeFile struct {
+ addr string
+ found bool
+ liker *Linker
+ realFile *link.Executable
+}
+
func (m *Linker) AddLink(linkF LinkFunc, p *ebpf.Program, trySymbolNames ...string) {
var lk link.Link
var err error
+ var realSym string
for _, n := range trySymbolNames {
lk, err = linkF(n, p)
if err == nil {
+ realSym = n
break
}
}
if err != nil {
m.errors = multierror.Append(m.errors, fmt.Errorf("open %s error: %v", trySymbolNames, err))
} else {
+ log.Debugf("attach to the kprobe: %s", realSym)
m.closers = append(m.closers, lk)
}
}
@@ -153,6 +163,56 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, reader RingBufferReader, dataSup
}()
}
+func (m *Linker) OpenUProbeExeFile(path string) *UProbeExeFile {
+ executable, err := link.OpenExecutable(path)
+ if err != nil {
+ m.errors = multierror.Append(m.errors, fmt.Errorf("cannot found the execute file: %s, error: %v", path, err))
+ return &UProbeExeFile{
+ found: false,
+ }
+ }
+
+ return &UProbeExeFile{
+ found: true,
+ addr: path,
+ liker: m,
+ realFile: executable,
+ }
+}
+
+func (m *UProbeExeFile) AddLink(symbol string, enter, exit *ebpf.Program, pid int) {
+ m.AddLinkWithType(symbol, true, enter, pid)
+ m.AddLinkWithType(symbol, false, exit, pid)
+}
+
+func (m *UProbeExeFile) AddLinkWithType(symbol string, enter bool, p *ebpf.Program, pid int) {
+ if !m.found {
+ return
+ }
+ var fun func(symbol string, prog *ebpf.Program, opts *link.UprobeOptions) (link.Link, error)
+ if enter {
+ fun = m.realFile.Uprobe
+ } else {
+ fun = m.realFile.Uretprobe
+ }
+
+ var t string
+ if enter {
+ t = "enter"
+ } else {
+ t = "exit"
+ }
+
+ lk, err := fun(symbol, p, &link.UprobeOptions{PID: pid})
+ if err != nil {
+ m.liker.errors = multierror.Append(m.liker.errors, fmt.Errorf("file: %s, symbol: %s, type: %s, pid: %d, error: %v",
+ m.addr, symbol, t, pid, err))
+ } else {
+ log.Debugf("attach to the uprobe, file: %s, symbol: %s, type: %s, pid: %d", m.addr, symbol, t, pid)
+ m.liker.closers = append(m.liker.closers, lk)
+ }
+}
+
func (m *Linker) HasError() error {
return m.errors
}
diff --git a/pkg/profiling/task/network/metrics.go b/pkg/profiling/task/network/metrics.go
index 191a0b8..388be01 100644
--- a/pkg/profiling/task/network/metrics.go
+++ b/pkg/profiling/task/network/metrics.go
@@ -191,6 +191,10 @@ type ProcessTraffic struct {
// current connection role of local process
ConnectionRole ConnectionRole
+ // the protocol of the connection
+ Protocol ConnectionProtocol
+ // current connection is SSL
+ IsSSL bool
// remote process/address information
RemoteIP string
@@ -338,6 +342,10 @@ func (r *ProcessTraffic) buildBasicMeterLabels(local api.ProcessInterface) (Conn
labels = r.appendRemoteAddrssInfo(labels, curRole.Revert().String(), local)
labels = r.appendMeterValue(labels, "side", curRole.String())
+
+ // protocol and ssl
+ labels = r.appendMeterValue(labels, "protocol", r.Protocol.String())
+ labels = r.appendMeterValue(labels, "is_ssl", fmt.Sprintf("%t", r.IsSSL))
return curRole, labels
}
diff --git a/pkg/profiling/task/network/runner.go b/pkg/profiling/task/network/runner.go
index 8821903..7581ade 100644
--- a/pkg/profiling/task/network/runner.go
+++ b/pkg/profiling/task/network/runner.go
@@ -96,14 +96,14 @@ func (r *Runner) Start(ctx context.Context, processes []api.ProcessInterface) er
return err
}
r.bpf = &objs
- r.bpfContext.Init(&objs)
+ r.bpfContext.Init(&objs, r.linker)
if err := r.bpfContext.AddProcesses(processes); err != nil {
return err
}
// register all handlers
- r.bpfContext.RegisterAllHandlers(r.linker)
+ r.bpfContext.RegisterAllHandlers()
r.bpfContext.StartSocketAddressParser(r.ctx)
// sock opts
@@ -238,8 +238,8 @@ func (r *Runner) logTheMetricsConnections(traffices []*ProcessTraffic) {
traffic.RemoteIP, traffic.RemotePort, traffic.RemotePid)
}
side := traffic.ConnectionRole.String()
- log.Debugf("connection analyze result: %s : %s -> %s, read: %d bytes/%d, write: %d bytes/%d",
- side, localInfo, remoteInfo, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
+ log.Debugf("connection analyze result: %s : %s -> %s, protocol: %s, is SSL: %t, read: %d bytes/%d, write: %d bytes/%d",
+ side, localInfo, remoteInfo, traffic.Protocol.String(), traffic.IsSSL, traffic.WriteCounter.Bytes, traffic.WriteCounter.Count,
traffic.ReadCounter.Bytes, traffic.ReadCounter.Count)
}
}
diff --git a/pkg/profiling/task/network/ssl.go b/pkg/profiling/task/network/ssl.go
new file mode 100644
index 0000000..a69782b
--- /dev/null
+++ b/pkg/profiling/task/network/ssl.go
@@ -0,0 +1,155 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package network
+
+import (
+ "fmt"
+ "os/exec"
+ "regexp"
+ "strconv"
+ "strings"
+
+ "github.com/apache/skywalking-rover/pkg/tools/profiling"
+
+ "github.com/apache/skywalking-rover/pkg/tools"
+
+ "github.com/apache/skywalking-rover/pkg/tools/path"
+)
+
+var openSSLVersionRegex = regexp.MustCompile(`^OpenSSL\s+(?P<Major>\d)\.(?P<Minor>\d)\.(?P<Fix>\d+)\w+`)
+
+type OpenSSLFdSymAddrConfigInBPF struct {
+ BIOReadOffset uint32
+ BIOWriteOffset uint32
+ FDOffset uint32
+}
+
+func addSSLProcess(pid int, bpf *bpfObjects, linker *Linker) error {
+ modules, err := tools.ProcessModules(int32(pid))
+ if err != nil {
+ return fmt.Errorf("read process modules error: %d, error: %v", pid, err)
+ }
+
+ // openssl process
+ if err1 := processOpenSSLProcess(pid, bpf, linker, modules); err1 != nil {
+ return err1
+ }
+
+ return nil
+}
+
+func processOpenSSLProcess(pid int, bpf *bpfObjects, linker *Linker, modules []*profiling.Module) error {
+ var libcryptoName, libsslName = "libcrypto.so", "libssl.so"
+ var libcryptoPath, libsslPath string
+ processModules, err := findProcessModules(modules, libcryptoName, libsslName)
+ if err != nil {
+ return err
+ }
+ // the openssl not exists, so ignore
+ if len(processModules) == 0 {
+ return nil
+ }
+ if libcrypto := processModules[libcryptoName]; libcrypto != nil {
+ libcryptoPath = libcrypto.Path
+ }
+ if libssl := processModules[libsslName]; libssl != nil {
+ libsslPath = libssl.Path
+ }
+ if libcryptoPath == "" || libsslPath == "" {
+ return fmt.Errorf("the OpenSSL library not complete, libcrypto: %s, libssl: %s", libcryptoPath, libsslPath)
+ }
+
+ // build the symbol address config and write to the bpf
+ conf, err := buildSSLSymAddrConfig(libcryptoPath)
+ if err != nil {
+ return err
+ }
+ if err := bpf.OpensslFdSymaddrFinder.Put(uint32(pid), conf); err != nil {
+ return err
+ }
+
+ // attach the linker
+ libSSLLinker := linker.OpenUProbeExeFile(libsslPath)
+ libSSLLinker.AddLink("SSL_write", bpf.OpensslWrite, bpf.OpensslWriteRet, pid)
+ libSSLLinker.AddLink("SSL_read", bpf.OpensslRead, bpf.OpensslReadRet, pid)
+ return linker.HasError()
+}
+
+func findProcessModules(modules []*profiling.Module, moduleNames ...string) (map[string]*profiling.Module, error) {
+ result := make(map[string]*profiling.Module)
+ for _, mod := range modules {
+ for _, modName := range moduleNames {
+ if strings.Contains(mod.Name, modName) {
+ if !path.Exists(mod.Path) {
+ return nil, fmt.Errorf("the module path not exists, path: %s", mod.Path)
+ }
+ result[modName] = mod
+ }
+ }
+ }
+ return result, nil
+}
+
+func buildSSLSymAddrConfig(libcryptoPath string) (*OpenSSLFdSymAddrConfigInBPF, error) {
+ // using "strings" command to query the symbol in the libcrypto library
+ result, err := exec.Command("strings", libcryptoPath).Output()
+ if err != nil {
+ return nil, err
+ }
+ for _, p := range strings.Split(string(result), "\n") {
+ submatch := openSSLVersionRegex.FindStringSubmatch(p)
+ if len(submatch) != 4 {
+ continue
+ }
+ major := submatch[1]
+ mijor := submatch[2]
+ fix := submatch[3]
+
+ log.Debugf("found the libcrypto.so version: %s.%s.%s", major, mijor, fix)
+ conf := &OpenSSLFdSymAddrConfigInBPF{}
+
+ // must be number, already validate in the regex
+ mijorVal, _ := strconv.Atoi(mijor)
+ fixVal, _ := strconv.Atoi(fix)
+
+ // max support version is 1.1.1
+ if mijorVal > 1 || fixVal > 1 {
+ return nil, fmt.Errorf("the fix version of the libcrypto is not support: %s.%s.%s", major, mijor, fix)
+ }
+
+ // bio offset
+ // https://github.com/openssl/openssl/blob/OpenSSL_1_0_0-stable/ssl/ssl.h#L1093-L1111
+ // https://github.com/openssl/openssl/blob/OpenSSL_1_1_1-stable/ssl/ssl_local.h#L1068-L1083
+ conf.BIOReadOffset = 16
+ conf.BIOWriteOffset = 24
+ // fd offset
+ if (mijorVal == 0) || (mijorVal == 1 && fixVal == 0) {
+ // 1.0.x || 1.1.0
+ // https://github.com/openssl/openssl/blob/OpenSSL_1_0_0-stable/crypto/bio/bio.h#L297-L306
+ conf.FDOffset = 40
+ } else {
+ // 1.1.1
+ // https://github.com/openssl/openssl/blob/OpenSSL_1_1_1-stable/crypto/bio/bio_local.h#L115-L125
+ conf.FDOffset = 48
+ }
+ log.Debugf("the lobcrypto.so library symbol verson config, version: %s.%s.%s, bio offset: %d",
+ major, mijor, fix, conf.FDOffset)
+ return conf, nil
+ }
+ return nil, fmt.Errorf("could not fount the version of the libcrypto.so")
+}
diff --git a/pkg/tools/process.go b/pkg/tools/process.go
index b814a39..7d55855 100644
--- a/pkg/tools/process.go
+++ b/pkg/tools/process.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/skywalking-rover/pkg/logger"
host2 "github.com/apache/skywalking-rover/pkg/tools/host"
+ "github.com/apache/skywalking-rover/pkg/tools/path"
"github.com/apache/skywalking-rover/pkg/tools/profiling"
)
@@ -82,6 +83,16 @@ func ProcessProfilingStat(pid int32, exePath string) (*profiling.Info, error) {
return analyzeProfilingInfo(context, pid)
}
+// ProcessModules Read the profiling info of the process, without the symbol check
+func ProcessModules(pid int32) ([]*profiling.Module, error) {
+ context := newAnalyzeContext()
+ info, err := analyzeProfilingInfo(context, pid)
+ if err != nil {
+ return nil, err
+ }
+ return info.Modules, nil
+}
+
func analyzeProfilingInfo(context *analyzeContext, pid int32) (*profiling.Info, error) {
// analyze process mapping
mapFile, _ := os.Open(host2.GetFileInHost(fmt.Sprintf("/proc/%d/maps", pid)))
@@ -116,6 +127,10 @@ func analyzeProfilingInfo(context *analyzeContext, pid int32) (*profiling.Info,
continue
}
modulePath := host2.GetFileInHost(fmt.Sprintf("/proc/%d/root%s", pid, moduleName))
+ if !path.Exists(modulePath) {
+ log.Debugf("could not found the module, ignore. name: %s, path: %s", moduleName, modulePath)
+ continue
+ }
module, err = context.GetFinder(modulePath).ToModule(pid, moduleName, modulePath, []*profiling.ModuleRange{moduleRange})
if err != nil {
@@ -149,6 +164,7 @@ func isIgnoreModuleName(name string) bool {
strings.HasPrefix(name, "/memfd:") ||
strings.HasPrefix(name, "[vdso]") ||
strings.HasPrefix(name, "[vsyscall]") ||
+ strings.HasPrefix(name, "[uprobes]") ||
strings.HasSuffix(name, ".map"))
}
diff --git a/pkg/tools/profiling/go_library.go b/pkg/tools/profiling/go_library.go
index 9883672..2257b2a 100644
--- a/pkg/tools/profiling/go_library.go
+++ b/pkg/tools/profiling/go_library.go
@@ -23,8 +23,6 @@ import (
"sort"
"strings"
- "github.com/hashicorp/go-multierror"
-
"github.com/apache/skywalking-rover/pkg/tools/path"
)
@@ -54,11 +52,10 @@ func (l *GoLibrary) AnalyzeSymbols(filePath string) ([]*Symbol, error) {
defer file.Close()
// exist symbol data
- symbols, symError := file.Symbols()
- dySyms, dyError := file.DynamicSymbols()
+ symbols, _ := file.Symbols()
+ dySyms, _ := file.DynamicSymbols()
if len(symbols) == 0 && len(dySyms) == 0 {
- symError = multierror.Append(symError, dyError)
- return nil, symError
+ return nil, nil
}
symbols = append(symbols, dySyms...)