You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by li...@apache.org on 2023/02/22 08:03:03 UTC
[skywalking-rover] branch main updated: Support continuous profiling (#78)
This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new c7bf0c1 Support continuous profiling (#78)
c7bf0c1 is described below
commit c7bf0c1875488db91e16a029ee2fbe0196af6cde
Author: mrproliu <74...@qq.com>
AuthorDate: Wed Feb 22 16:02:57 2023 +0800
Support continuous profiling (#78)
---
CHANGES.md | 1 +
bpf/include/api.h | 13 +-
.../network => include}/protocol_analyzer.h | 24 +-
bpf/profiling/continuous/network.c | 129 +++++++
bpf/profiling/continuous/network.h | 66 ++++
.../config.go => bpf/profiling/continuous/skb.h | 34 +-
bpf/profiling/network/args.h | 1 +
bpf/profiling/network/common.h | 6 -
bpf/profiling/network/netmonitor.c | 13 +-
bpf/profiling/offcpu.c | 4 +-
configs/rover_configs.yaml | 15 +-
docs/en/setup/configuration/profiling.md | 71 +++-
go.mod | 4 +-
go.sum | 13 +-
pkg/process/api.go | 2 +
pkg/process/api/process.go | 4 +
pkg/process/finders/context.go | 6 +
pkg/process/finders/manager.go | 4 +
pkg/process/finders/storage.go | 12 +
pkg/process/module.go | 4 +
pkg/profiling/config.go | 6 +-
pkg/profiling/continuous/base/checker.go | 68 ++++
pkg/profiling/{ => continuous/base}/config.go | 21 +-
pkg/profiling/continuous/base/metrics.go | 104 ++++++
pkg/profiling/continuous/base/policy.go | 79 +++++
pkg/profiling/continuous/base/trigger.go | 42 +++
pkg/profiling/continuous/base/windows.go | 249 ++++++++++++++
.../continuous/checker/bpf/network/http1.go | 135 ++++++++
.../continuous/checker/bpf/network/network.go | 187 +++++++++++
.../continuous/checker/bpf/network/reader.go | 77 +++++
pkg/profiling/continuous/checker/common/causes.go | 116 +++++++
pkg/profiling/continuous/checker/common/checker.go | 87 +++++
.../continuous/checker/common/http_checker.go | 250 ++++++++++++++
.../continuous/checker/common/process_checker.go | 152 +++++++++
.../continuous/checker/common/system_checker.go | 138 ++++++++
.../continuous/checker/network_error_rate.go | 73 ++++
.../continuous/checker/network_response_time.go | 64 ++++
pkg/profiling/continuous/checker/process_cpu.go | 53 +++
pkg/profiling/continuous/checker/process_thread.go | 47 +++
pkg/profiling/continuous/checker/system_load.go | 51 +++
pkg/profiling/continuous/checkers.go | 369 +++++++++++++++++++++
pkg/profiling/continuous/manager.go | 70 ++++
pkg/profiling/continuous/trigger/common.go | 161 +++++++++
pkg/profiling/continuous/trigger/network.go | 120 +++++++
pkg/profiling/continuous/trigger/offcpu.go | 47 +++
pkg/profiling/continuous/trigger/oncpu.go | 47 +++
pkg/profiling/continuous/triggers.go | 128 +++++++
pkg/profiling/manager.go | 111 ++-----
pkg/profiling/task/base/task.go | 18 +
pkg/profiling/task/context.go | 8 +
pkg/profiling/task/manager.go | 129 +++++--
pkg/profiling/task/network/bpf/bpf.go | 4 +-
pkg/profiling/task/network/ssl.go | 9 +-
pkg/profiling/task/oncpu/runner.go | 3 +
.../task/network/bpf => tools/btf}/linker.go | 5 +-
.../profiling/network/base/rover_configs.yaml | 15 +-
test/e2e/cases/profiling/network/envoy/kind.yaml | 4 +
57 files changed, 3484 insertions(+), 189 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 41d3be3..6955adb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
* Add the syscall level event to the trace.
* Support OpenSSL 3.0.x.
* Optimized the data structure in BPF.
+* Support continuous profiling.
#### Bug Fixes
* Fix HTTP method name in protocol analyzer
diff --git a/bpf/include/api.h b/bpf/include/api.h
index a136dc7..aac5cca 100644
--- a/bpf/include/api.h
+++ b/bpf/include/api.h
@@ -27,11 +27,22 @@
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
-#define _(P) \
+#define _KERNEL(P) \
({ \
typeof(P) val; \
bpf_probe_read_kernel(&val, sizeof(val), &(P)); \
val; \
})
+#define _(P) \
+ ({ \
+ typeof(P) val; \
+ bpf_probe_read(&val, sizeof(val), &(P)); \
+ val; \
+ })
+
+typedef enum
+{
+ true=1, false=0
+} bool;
#endif
\ No newline at end of file
diff --git a/bpf/profiling/network/protocol_analyzer.h b/bpf/include/protocol_analyzer.h
similarity index 87%
rename from bpf/profiling/network/protocol_analyzer.h
rename to bpf/include/protocol_analyzer.h
index 3f22d90..737e557 100644
--- a/bpf/profiling/network/protocol_analyzer.h
+++ b/bpf/include/protocol_analyzer.h
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include "common.h"
-
#pragma once
#define CONNECTION_PROTOCOL_UNKNOWN 0
@@ -77,16 +75,16 @@ static __inline __u32 infer_http1_message(const char* buf, size_t count) {
// HTTP 2.x
// frame format: https://www.rfc-editor.org/rfc/rfc7540.html#section-4.1
static __inline __u32 infer_http2_message(const char* buf, size_t count) {
- static const uint8_t kFrameBasicSize = 0x9; // including Length, Type, Flags, Reserved, Stream Identity
- static const uint8_t kFrameTypeHeader = 0x1; // the type of the frame: https://www.rfc-editor.org/rfc/rfc7540.html#section-6.2
- static const uint8_t kFrameLoopCount = 5;
+ static const __u8 kFrameBasicSize = 0x9; // including Length, Type, Flags, Reserved, Stream Identity
+ static const __u8 kFrameTypeHeader = 0x1; // the type of the frame: https://www.rfc-editor.org/rfc/rfc7540.html#section-6.2
+ static const __u8 kFrameLoopCount = 5;
- static const uint8_t kStaticTableMaxSize = 61;// https://www.rfc-editor.org/rfc/rfc7541#appendix-A
- static const uint8_t kStaticTableAuth = 1;
- static const uint8_t kStaticTableGet = 2;
- static const uint8_t kStaticTablePost = 3;
- static const uint8_t kStaticTablePath1 = 4;
- static const uint8_t kStaticTablePath2 = 5;
+ static const __u8 kStaticTableMaxSize = 61;// https://www.rfc-editor.org/rfc/rfc7541#appendix-A
+ static const __u8 kStaticTableAuth = 1;
+ static const __u8 kStaticTableGet = 2;
+ static const __u8 kStaticTablePost = 3;
+ static const __u8 kStaticTablePath1 = 4;
+ static const __u8 kStaticTablePath2 = 5;
// the buffer size must bigger than basic frame size
if (count < kFrameBasicSize) {
@@ -154,7 +152,7 @@ static __inline __u32 infer_http2_message(const char* buf, size_t count) {
return CONNECTION_MESSAGE_TYPE_UNKNOWN;
}
-static __inline __u32 analyze_protocol(char *buf, __u32 count, struct active_connection_t *conn_info) {
+static __inline __u32 analyze_protocol(char *buf, __u32 count, __u8 *protocol_ref) {
__u32 protocol = CONNECTION_PROTOCOL_UNKNOWN, type = CONNECTION_MESSAGE_TYPE_UNKNOWN;
// support http 1.x and 2.x
@@ -165,7 +163,7 @@ static __inline __u32 analyze_protocol(char *buf, __u32 count, struct active_con
}
if (protocol != CONNECTION_PROTOCOL_UNKNOWN) {
- conn_info->protocol = protocol;
+ *protocol_ref = protocol;
}
return type;
diff --git a/bpf/profiling/continuous/network.c b/bpf/profiling/continuous/network.c
new file mode 100644
index 0000000..cd0a603
--- /dev/null
+++ b/bpf/profiling/continuous/network.c
@@ -0,0 +1,129 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stddef.h>
+#include <stdlib.h>
+#include <string.h>
+#include <linux/sched.h>
+#include <linux/bpf.h>
+#include <linux/ptrace.h>
+#include <asm/errno.h>
+#include <asm/socket.h>
+#include <linux/netfilter_ipv4.h>
+#include <linux/tcp.h>
+#include <bpf/bpf_endian.h>
+#include <bpf/bpf_helpers.h>
+#include <bpf/bpf_tracing.h>
+#include <bpf/bpf_core_read.h>
+#include "network.h"
+#include "protocol_analyzer.h"
+
+char __license[] SEC("license") = "Dual MIT/GPL";
+
+static __always_inline bool socket_should_trace(__u64 id, struct sock *sock) {
+ // check the pid is monitoring
+ __u32 tgid = (__u32)(id >> 32);
+ if (tgid_should_trace(tgid) == false) {
+ return false;
+ }
+
+ // check the socket if server side
+ int max_ack_backlog = 0;
+ if (0 != bpf_core_read(&max_ack_backlog, sizeof(max_ack_backlog),
+ &sock->sk_max_ack_backlog)) {
+ return true;
+ }
+ if (max_ack_backlog == 0) {
+ return false;
+ }
+ return true;
+}
+
+static __always_inline void process_data(struct pt_regs *ctx, __u64 id, void *channel_ref, struct msghdr *msg) {
+ const struct iovec *iovec;
+ iovec = _KERNEL(msg->msg_iter.iov);
+ struct iovec iov;
+ bpf_probe_read(&iov, sizeof(iov), iovec);
+ char* buf = (char *)iov.iov_base;
+ __u64 size = iov.iov_len;
+ if (size <= 0) {
+ return;
+ }
+
+ if (size > MAX_PROTOCOL_SOCKET_READ_LENGTH) {
+ size = MAX_PROTOCOL_SOCKET_READ_LENGTH;
+ }
+ __u32 kZero = 0;
+ struct socket_buffer_reader_t* reader = bpf_map_lookup_elem(&socket_buffer_reader_map, &kZero);
+ if (reader == NULL) {
+ return;
+ }
+ asm volatile("%[size] &= 0x9f;\n" ::[size] "+r"(size) :);
+ bpf_probe_read(&reader->buffer, size & MAX_PROTOCOL_SOCKET_READ_LENGTH, buf);
+ __u8 protocol;
+ __u32 direction = analyze_protocol(reader->buffer, size & MAX_PROTOCOL_SOCKET_READ_LENGTH, &protocol);
+ if (protocol != CONNECTION_PROTOCOL_UNKNOWN) {
+ reader->timestamp = bpf_ktime_get_ns();
+ reader->channel_ref = channel_ref;
+ reader->pid = (__u32)(id >> 32);
+ reader->protocol = protocol;
+ reader->direction = (__u8)direction;
+ reader->size = size & MAX_PROTOCOL_SOCKET_READ_LENGTH;
+ bpf_perf_event_output(ctx, &socket_buffer_send_queue, BPF_F_CURRENT_CPU, reader, sizeof(*reader));
+ }
+}
+
+SEC("kprobe/tcp_sendmsg")
+int tcp_sendmsg(struct pt_regs *ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock *s = (void *)PT_REGS_PARM1(ctx);
+ if (socket_should_trace(id, s) == false) {
+ return 0;
+ }
+
+ struct msghdr *msg = (void *)PT_REGS_PARM2(ctx);
+ process_data(ctx, id, s, msg);
+ return 0;
+}
+
+SEC("kprobe/tcp_recvmsg")
+int tcp_recvmsg(struct pt_regs *ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct sock *s = (void *)PT_REGS_PARM1(ctx);
+ if (socket_should_trace(id, s) == false) {
+ return 0;
+ }
+
+ struct msghdr *msg = (void *)PT_REGS_PARM2(ctx);
+ struct recv_msg_args args = {};
+ args.sock = s;
+ args.msg = msg;
+ bpf_map_update_elem(&receiving_args, &id, &args, 0);
+ return 0;
+}
+
+SEC("kretprobe/tcp_recvmsg")
+int ret_tcp_recvmsg(struct pt_regs *ctx) {
+ __u64 id = bpf_get_current_pid_tgid();
+ struct recv_msg_args *args = bpf_map_lookup_elem(&receiving_args, &id);
+ int bytes_count = PT_REGS_RC(ctx);
+ if (args != NULL && bytes_count > 0) {
+ process_data(ctx, id, args->sock, args->msg);
+ }
+ bpf_map_delete_elem(&receiving_args, &id);
+ return 0;
+}
\ No newline at end of file
diff --git a/bpf/profiling/continuous/network.h b/bpf/profiling/continuous/network.h
new file mode 100644
index 0000000..b8c4dd3
--- /dev/null
+++ b/bpf/profiling/continuous/network.h
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "api.h"
+#include "skb.h"
+
+#define MAX_PROTOCOL_SOCKET_READ_LENGTH 158
+
+struct {
+ __uint(type, BPF_MAP_TYPE_HASH);
+ __uint(max_entries, 10000);
+ __type(key, __u32);
+ __type(value, __u32);
+} process_monitor_control SEC(".maps");
+static __inline bool tgid_should_trace(__u32 tgid) {
+ __u32 *val = bpf_map_lookup_elem(&process_monitor_control, &tgid);
+ if (!val) {
+ return false;
+ }
+ return (*val) == 1 ? true : false;
+}
+
+struct recv_msg_args {
+ struct sock* sock;
+ struct msghdr* msg;
+};
+
+struct {
+ __uint(type, BPF_MAP_TYPE_HASH);
+ __uint(max_entries, 10000);
+ __type(key, __u64);
+ __type(value, struct recv_msg_args);
+} receiving_args SEC(".maps");
+
+struct socket_buffer_reader_t {
+ __u64 timestamp;
+ void *channel_ref;
+ __u8 protocol;
+ __u8 direction;
+ __u16 size;
+ __u32 pid;
+ char buffer[MAX_PROTOCOL_SOCKET_READ_LENGTH + 1];
+};
+struct {
+ __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+ __type(key, __u32);
+ __type(value, struct socket_buffer_reader_t);
+ __uint(max_entries, 1);
+} socket_buffer_reader_map SEC(".maps");
+struct {
+ __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+} socket_buffer_send_queue SEC(".maps");
\ No newline at end of file
diff --git a/pkg/profiling/config.go b/bpf/profiling/continuous/skb.h
similarity index 59%
copy from pkg/profiling/config.go
copy to bpf/profiling/continuous/skb.h
index 6bd9aa4..ed1a9ed 100644
--- a/pkg/profiling/config.go
+++ b/bpf/profiling/continuous/skb.h
@@ -15,18 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-package profiling
+struct iovec {
+ void *iov_base;
+ __u64 iov_len;
+} __attribute__((preserve_access_index));
-import (
- "github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/profiling/task/base"
-)
+struct iov_iter {
+ __u64 count;
+ union {
+ const struct iovec *iov;
+ const struct kvec *kvec;
+ const struct bio_vec *bvec;
+ struct xarray *xarray;
+ struct pipe_inode_info *pipe;
+ };
+} __attribute__((preserve_access_index));
-type Config struct {
- module.Config `mapstructure:",squash"`
+struct msghdr {
+ struct iov_iter msg_iter; /* data */
+ unsigned int msg_flags;
+} __attribute__((preserve_access_index));
- CheckInterval string `mapstructure:"check_interval"` // Check the profiling task interval
- FlushInterval string `mapstructure:"flush_interval"` // Flush profiling data interval
-
- TaskConfig *base.TaskConfig `mapstructure:"task"` // Profiling task config
-}
+struct sock {
+ struct socket *sk_socket;
+ __u32 sk_max_ack_backlog;
+} __attribute__((preserve_access_index));
\ No newline at end of file
diff --git a/bpf/profiling/network/args.h b/bpf/profiling/network/args.h
index 9d7c90b..f61b134 100644
--- a/bpf/profiling/network/args.h
+++ b/bpf/profiling/network/args.h
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include "api.h"
#include "socket.h"
#pragma once
diff --git a/bpf/profiling/network/common.h b/bpf/profiling/network/common.h
index cbb918a..3fa197d 100644
--- a/bpf/profiling/network/common.h
+++ b/bpf/profiling/network/common.h
@@ -15,9 +15,3 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-typedef enum
-{
- true=1, false=0
-} bool;
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 78d1bd2..13abc00 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -40,13 +40,6 @@
char __license[] SEC("license") = "Dual MIT/GPL";
-#define _(P) \
- ({ \
- typeof(P) val; \
- bpf_probe_read(&val, sizeof(val), &(P)); \
- val; \
- })
-
#define SOCKET_UPLOAD_CHUNK_LIMIT 12
static __inline bool family_should_trace(const __u32 family) {
@@ -115,6 +108,10 @@ static __always_inline void submit_new_connection(struct pt_regs* ctx, __u32 fun
con.socket_family = AF_UNKNOWN;
}
+ if (con.remote_port == 53) {
+ bpf_printk("found the remote port is 53");
+ }
+
// save to the active connection map
__u64 conid = gen_tgid_fd(tgid, fd);
struct socket_connect_event_t *event = create_socket_connect_event();
@@ -438,7 +435,7 @@ static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, st
if ((conn->role == CONNECTION_ROLE_TYPE_UNKNOWN || conn->protocol == 0) && conn->ssl == ssl) {
struct socket_buffer_reader_t *buf_reader = read_socket_data(args, bytes_count);
if (buf_reader != NULL) {
- msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, conn);
+ msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, &conn->protocol);
// if send request data to remote address or receive response data from remote address
// then, recognized current connection is client
if ((msg_type == CONNECTION_MESSAGE_TYPE_REQUEST && data_direction == SOCK_DATA_DIRECTION_EGRESS) ||
diff --git a/bpf/profiling/offcpu.c b/bpf/profiling/offcpu.c
index b1a7ff1..c0af649 100644
--- a/bpf/profiling/offcpu.c
+++ b/bpf/profiling/offcpu.c
@@ -29,8 +29,8 @@ int do_finish_task_switch(struct pt_regs *ctx) {
__u64 ts, *tsp;
struct task_struct *prev = (void *) PT_REGS_PARM1(ctx);
- pid = _(prev->pid);
- tgid = _(prev->tgid);
+ pid = _KERNEL(prev->pid);
+ tgid = _KERNEL(prev->tgid);
// in kernel, tgid means the process id
// monitor_pid is the same concept with process id
diff --git a/configs/rover_configs.yaml b/configs/rover_configs.yaml
index 695c567..f9af6c9 100644
--- a/configs/rover_configs.yaml
+++ b/configs/rover_configs.yaml
@@ -138,4 +138,17 @@ profiling:
# The default body encoding when sampling the request
default_request_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_REQUEST_ENCODING:UTF-8}
# The default body encoding when sampling the response
- default_response_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING:UTF-8}
\ No newline at end of file
+ default_response_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING:UTF-8}
+ # continuous profiling config
+ continuous:
+ # continuous related meters prefix name
+ meter_prefix: ${ROVER_PROFILING_CONTINUOUS_METER_PREFIX:rover_con_p}
+ # The interval of fetch metrics from the system, such as Process CPU, System Load, etc.
+ fetch_interval: ${ROVER_PROFILING_CONTINUOUS_FETCH_INTERVAL:1s}
+ # The interval of check metrics is reach the thresholds
+ check_interval: ${ROVER_PROFILING_CONTINUOUS_CHECK_INTERVAL:5s}
+ trigger:
+ # the duration of the profiling task
+ execute_duration: ${ROVER_PROFILING_CONTINUOUS_TRIGGER_EXECUTE_DURATION:10m}
+ # the minimal duration between the execution of the same profiling task
+ silence_duration: ${ROVER_PROFILING_CONTINUOUS_TRIGGER_SILENCE_DURATION:20m}
\ No newline at end of file
diff --git a/docs/en/setup/configuration/profiling.md b/docs/en/setup/configuration/profiling.md
index fe107ee..fc4263e 100644
--- a/docs/en/setup/configuration/profiling.md
+++ b/docs/en/setup/configuration/profiling.md
@@ -5,19 +5,24 @@ and send the snapshot to the backend server.
## Configuration
-| Name | Default | Environment Key | Description |
-|---------------------------------------------------------------------------------|-------------|---------------------------------------------------------------------------------------|---------------------------------------------------------------------|
-| profiling.active | true | ROVER_PROFILING_ACTIVE | Is active the process profiling. |
-| profiling.check_interval | 10s | ROVER_PROFILING_CHECK_INTERVAL | Check the profiling task interval. |
-| profiling.flush_interval | 5s | ROVER_PROFILING_FLUSH_INTERVAL | Combine existing profiling data and report to the backend interval. |
-| profiling.task.on_cpu.dump_period | 9ms | ROVER_PROFILING_TASK_ON_CPU_DUMP_PERIOD | The profiling stack dump period. |
-| profiling.task.network.report_interval | 2s | ROVER_PROFILING_TASK_NETWORK_TOPOLOGY_REPORT_INTERVAL | The interval of send metrics to the backend. |
-| profiling.task.network.meter_prefix | rover_net_p | ROVER_PROFILING_TASK_NETWORK_TOPOLOGY_METER_PREFIX | The prefix of network profiling metrics name. |
-| profiling.task.network.protocol_analyze.per_cpu_buffer | 400KB | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_PER_CPU_BUFFER | The size of socket data buffer on each CPU. |
-| profiling.task.network.protocol_analyze.parallels | 2 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_PARALLELS | The count of parallel protocol analyzer. |
-| profiling.task.network.protocol_analyze.queue_size | 5000 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_QUEUE_SIZE | The size of per paralleled analyzer queue. |
-| profiling.task.network.protocol_analyze.sampling.http.default_request_encoding | UTF-8 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_REQUEST_ENCODING | The default body encoding when sampling the request. |
-| profiling.task.network.protocol_analyze.sampling.http.default_response_encoding | UTF-8 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING | The default body encoding when sampling the response. |
+| Name | Default | Environment Key | Description |
+|---------------------------------------------------------------------------------|-------------|---------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------|
+| profiling.active | true | ROVER_PROFILING_ACTIVE | Is active the process profiling. |
+| profiling.check_interval | 10s | ROVER_PROFILING_CHECK_INTERVAL | Check the profiling task interval. |
+| profiling.flush_interval | 5s | ROVER_PROFILING_FLUSH_INTERVAL | Combine existing profiling data and report to the backend interval. |
+| profiling.task.on_cpu.dump_period | 9ms | ROVER_PROFILING_TASK_ON_CPU_DUMP_PERIOD | The profiling stack dump period. |
+| profiling.task.network.report_interval | 2s | ROVER_PROFILING_TASK_NETWORK_TOPOLOGY_REPORT_INTERVAL | The interval of send metrics to the backend. |
+| profiling.task.network.meter_prefix | rover_net_p | ROVER_PROFILING_TASK_NETWORK_TOPOLOGY_METER_PREFIX | The prefix of network profiling metrics name. |
+| profiling.task.network.protocol_analyze.per_cpu_buffer | 400KB | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_PER_CPU_BUFFER | The size of socket data buffer on each CPU. |
+| profiling.task.network.protocol_analyze.parallels | 2 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_PARALLELS | The count of parallel protocol analyzer. |
+| profiling.task.network.protocol_analyze.queue_size | 5000 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_QUEUE_SIZE | The size of per paralleled analyzer queue. |
+| profiling.task.network.protocol_analyze.sampling.http.default_request_encoding | UTF-8 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_REQUEST_ENCODING | The default body encoding when sampling the request. |
+| profiling.task.network.protocol_analyze.sampling.http.default_response_encoding | UTF-8 | ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING | The default body encoding when sampling the response. |
+| profiling.continuous.meter_prefix | rover_con_p | ROVER_PROFILING_CONTINUOUS_METER_PREFIX | The continuous related meters prefix name. |
+| profiling.continuous.fetch_interval | 1s | ROVER_PROFILING_CONTINUOUS_FETCH_INTERVAL | The interval of fetch metrics from the system, such as Process CPU, System Load, etc. |
+| profiling.continuous.check_interval | 5s | ROVER_PROFILING_CONTINUOUS_CHECK_INTERVAL | The interval of check metrics is reach the thresholds. |
+| profiling.continuous.trigger.execute_duration | 10m | ROVER_PROFILING_CONTINUOUS_TRIGGER_EXECUTE_DURATION | The duration of the profiling task. |
+| profiling.continuous.trigger.silence_duration | 20m | ROVER_PROFILING_CONTINUOUS_TRIGGER_SILENCE_DURATION | The minimal duration between the execution of the same profiling task. |
## Profiling Type
@@ -118,3 +123,43 @@ Based on the above two data types, the following metrics are provided.
| HTTP Request Sampling | Complete information about the HTTP request, it's only reported when it matches slow/4xx/5xx traces. |
| HTTP Response Sampling | Complete information about the HTTP response, it's only reported when it matches slow/4xx/5xx traces. |
| Syscall xxx | The methods to use when the process invoke with the network-related syscall method. It's only reported when it matches slow/4xx/5xx traces. |
+
+## Continuous Profiling
+
+The continuous profiling feature monitors low-power target process information, including process CPU usage and network requests, based on configuration passed from the backend.
+When a threshold is met, it automatically initiates a profiling task(on/off CPU, Network) to provide more detailed analysis.
+
+### Monitor Type
+
+#### System Load
+
+Monitor the average system load for the last minute, which is equivalent to using the first value of the `load average` in the `uptime` command.
+
+#### Process CPU
+
+The target process utilizes a certain percentage of the CPU on the current host.
+
+#### Process Thread Count
+
+The real-time number of threads in the target process.
+
+#### Network
+
+Network monitoring uses eBPF technology to collect real-time performance data of the current process responding to requests. Requests sent upstream are not monitored by the system.
+
+Currently, network monitoring supports parsing of the HTTP/1.x protocol and supports the following types of monitoring:
+
+1. `Error Rate`: The percentage of network request errors, such as HTTP status codes within the range of `[500-600)`, is considered as erroneous.
+2. `Avg Response Time`: Average response time(ms) for specified URI.
+
+### Metrics
+
+Rover would periodically send collected monitoring data to the backend using the `Native Meter Protocol`.
+
+| Name | Unit | Description |
+|------------------------|----------|---------------------------------------------------------------------------|
+| process_cpu | (0-100)% | The CPU usage percent |
+| process_thread_count | count | The thread count of process |
+| system_load | count | The average system load for the last minute, each process have same value |
+| http_error_rate | (0-100)% | The network request error rate percentage |
+| http_avg_response_time | ms | The network average response duration |
\ No newline at end of file
diff --git a/go.mod b/go.mod
index eec7452..8696dea 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,7 @@ require (
k8s.io/apimachinery v0.23.5
k8s.io/client-go v0.23.5
k8s.io/utils v0.0.0-20211116205334-6203023598ed
- skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187
+ skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c
)
require (
@@ -58,7 +58,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
- golang.org/x/text v0.3.7 // indirect
+ golang.org/x/text v0.3.8 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
diff --git a/go.sum b/go.sum
index 6f7c0ed..78b8e05 100644
--- a/go.sum
+++ b/go.sum
@@ -445,6 +445,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
+github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zekroTJA/timedmap v1.4.0 h1:NIkLScX6kMzkzFP7kCIkkgKYdooAJ1itkMbJODX2WPU=
@@ -476,6 +477,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
+golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -512,6 +514,7 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
+golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -588,6 +591,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -662,6 +666,7 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -677,8 +682,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
-golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
+golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
+golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -740,6 +746,7 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -956,5 +963,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187 h1:6JgAg9aohcHd72VplZUGycZgCNo6iQrz735nmtOTCnE=
-skywalking.apache.org/repo/goapi v0.0.0-20221019074310-53ebda305187/go.mod h1:lxmYWY1uAP5SLVKNymAyDzn7KG6dhPWN+pYHmyt+0vo=
+skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c h1:UUcBWaN9cUdtYqdj9ssIcL/BuHD4jT17smPgfpZTcVg=
+skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c/go.mod h1:BS5LRvsAMmZn8YIW9n0+8eiJhC9zVn663fDr5t+cL40=
diff --git a/pkg/process/api.go b/pkg/process/api.go
index db51cd9..9f7bdb3 100644
--- a/pkg/process/api.go
+++ b/pkg/process/api.go
@@ -24,6 +24,8 @@ type Operator interface {
FindProcessByID(processID string) api.ProcessInterface
// FindProcessByPID get all processes with difference entity through process PID
FindProcessByPID(pid int32) []api.ProcessInterface
+ // FindAllRegisteredProcesses find all registered processes
+ FindAllRegisteredProcesses() []api.ProcessInterface
// AddListener add new process listener
AddListener(listener api.ProcessListener)
// DeleteListener delete the process listener
diff --git a/pkg/process/api/process.go b/pkg/process/api/process.go
index bde8bda..fca705e 100644
--- a/pkg/process/api/process.go
+++ b/pkg/process/api/process.go
@@ -21,6 +21,8 @@ import (
"encoding/json"
"fmt"
+ "github.com/shirou/gopsutil/process"
+
"github.com/apache/skywalking-rover/pkg/tools/profiling"
)
@@ -54,6 +56,8 @@ type ProcessInterface interface {
ProfilingStat() *profiling.Info
// ExeName get execute file name
ExeName() (string, error)
+ // OriginalProcess from process library
+ OriginalProcess() *process.Process
// PortIsExpose check the port is exposed
PortIsExpose(port int) bool
diff --git a/pkg/process/finders/context.go b/pkg/process/finders/context.go
index d544e17..b1760a4 100644
--- a/pkg/process/finders/context.go
+++ b/pkg/process/finders/context.go
@@ -21,6 +21,8 @@ import (
"github.com/apache/skywalking-rover/pkg/process/api"
"github.com/apache/skywalking-rover/pkg/process/finders/base"
"github.com/apache/skywalking-rover/pkg/tools/profiling"
+
+ "github.com/shirou/gopsutil/process"
)
type ProcessUploadStatus int8
@@ -82,6 +84,10 @@ func (p *ProcessContext) ExeName() (string, error) {
return p.exeName, nil
}
+func (p *ProcessContext) OriginalProcess() *process.Process {
+ return p.detectProcess.OriginalProcess()
+}
+
func (p *ProcessContext) PortIsExpose(port int) bool {
return p.exposedPorts[port]
}
diff --git a/pkg/process/finders/manager.go b/pkg/process/finders/manager.go
index 3b0cb08..8c59368 100644
--- a/pkg/process/finders/manager.go
+++ b/pkg/process/finders/manager.go
@@ -133,6 +133,10 @@ func (m *ProcessManager) FindProcessByPID(pid int32) []api.ProcessInterface {
return m.storage.FindProcessByPID(pid)
}
+func (m *ProcessManager) FindAllRegisteredProcesses() []api.ProcessInterface {
+ return m.storage.FindAllRegisteredProcesses()
+}
+
func (m *ProcessManager) AddListener(listener api.ProcessListener) {
m.storage.AddListener(listener)
}
diff --git a/pkg/process/finders/storage.go b/pkg/process/finders/storage.go
index 128f02e..f382f2f 100644
--- a/pkg/process/finders/storage.go
+++ b/pkg/process/finders/storage.go
@@ -318,6 +318,18 @@ func (s *ProcessStorage) GetAllProcesses() []api.ProcessInterface {
return result
}
+func (s *ProcessStorage) FindAllRegisteredProcesses() []api.ProcessInterface {
+ result := make([]api.ProcessInterface, 0)
+ for _, processList := range s.processes {
+ for _, p := range processList {
+ if p.syncStatus == ReportSuccess {
+ result = append(result, p)
+ }
+ }
+ }
+ return result
+}
+
func (s *ProcessStorage) FindProcessByID(processID string) api.ProcessInterface {
for _, finderProcesses := range s.processes {
for _, p := range finderProcesses {
diff --git a/pkg/process/module.go b/pkg/process/module.go
index 04af227..769880e 100644
--- a/pkg/process/module.go
+++ b/pkg/process/module.go
@@ -83,6 +83,10 @@ func (m *Module) FindProcessByID(processID string) api.ProcessInterface {
return m.manager.FindProcessByID(processID)
}
+func (m *Module) FindAllRegisteredProcesses() []api.ProcessInterface {
+ return m.manager.FindAllRegisteredProcesses()
+}
+
func (m *Module) FindProcessByPID(pid int32) []api.ProcessInterface {
return m.manager.FindProcessByPID(pid)
}
diff --git a/pkg/profiling/config.go b/pkg/profiling/config.go
index 6bd9aa4..13ea66d 100644
--- a/pkg/profiling/config.go
+++ b/pkg/profiling/config.go
@@ -19,7 +19,8 @@ package profiling
import (
"github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+ continuousBase "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
)
type Config struct {
@@ -28,5 +29,6 @@ type Config struct {
CheckInterval string `mapstructure:"check_interval"` // Check the profiling task interval
FlushInterval string `mapstructure:"flush_interval"` // Flush profiling data interval
- TaskConfig *base.TaskConfig `mapstructure:"task"` // Profiling task config
+ TaskConfig *taskBase.TaskConfig `mapstructure:"task"` // Profiling task config
+ ContinuousConfig *continuousBase.ContinuousConfig `mapstructure:"continuous"` // Continuous profiling config
}
diff --git a/pkg/profiling/continuous/base/checker.go b/pkg/profiling/continuous/base/checker.go
new file mode 100644
index 0000000..4aa21c4
--- /dev/null
+++ b/pkg/profiling/continuous/base/checker.go
@@ -0,0 +1,68 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "github.com/apache/skywalking-rover/pkg/process/api"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type CheckType string
+
+const (
+ CheckTypeProcessCPU CheckType = "PROCESS_CPU"
+ CheckTypeProcessThreadCount CheckType = "PROCESS_THREAD_COUNT"
+ CheckTypeSystemLoad CheckType = "SYSTEM_LOAD"
+ CheckTypeHTTPErrorRate CheckType = "HTTP_ERROR_RATE"
+ CheckTypeHTTPAvgResponseTime CheckType = "HTTP_AVG_RESPONSE_TIME"
+)
+
+type Checker interface {
+ // Init the checker with basic configs
+ Init(config *ContinuousConfig) error
+ // SyncPolicies from all service policies and processes
+ SyncPolicies(policies []*SyncPolicyWithProcesses)
+ // Fetch the policies data and save to the time windows
+ Fetch() error
+ // Check the policies are reached
+ // Each policy with process should use CheckContext.ShouldCheck to validate the policy should be checked or not
+ Check(ctx CheckContext, metricsAppender *MetricsAppender) []ThresholdCause
+ // Close the checker
+ Close() error
+}
+
+type CheckContext interface {
+ // ShouldCheck validate the process with policy item should be checked or ignore
+ // Usually if the process already start profiling with specific process, the check should be ignored
+ ShouldCheck(p api.ProcessInterface, item *PolicyItem) bool
+}
+
+type SyncPolicyWithProcesses struct {
+ Policy *Policy
+ Processes map[string]api.ProcessInterface
+}
+
+type ThresholdCause interface {
+ // Process is over threshold
+ Process() api.ProcessInterface
+ // FromPolicy current from which policy
+ FromPolicy() *PolicyItem
+ // GenerateTransferCause transmit to the backend
+ GenerateTransferCause() *v3.ContinuousProfilingCause
+}
diff --git a/pkg/profiling/config.go b/pkg/profiling/continuous/base/config.go
similarity index 55%
copy from pkg/profiling/config.go
copy to pkg/profiling/continuous/base/config.go
index 6bd9aa4..bbb45db 100644
--- a/pkg/profiling/config.go
+++ b/pkg/profiling/continuous/base/config.go
@@ -15,18 +15,17 @@
// specific language governing permissions and limitations
// under the License.
-package profiling
+package base
-import (
- "github.com/apache/skywalking-rover/pkg/module"
- "github.com/apache/skywalking-rover/pkg/profiling/task/base"
-)
+type ContinuousConfig struct {
+ MeterPrefix string `mapstructure:"meter_prefix"` // continuous related meters prefix name
-type Config struct {
- module.Config `mapstructure:",squash"`
-
- CheckInterval string `mapstructure:"check_interval"` // Check the profiling task interval
- FlushInterval string `mapstructure:"flush_interval"` // Flush profiling data interval
+ FetchInterval string `mapstructure:"fetch_interval"` // The interval of fetch metrics from the system
+ CheckInterval string `mapstructure:"check_interval"` // The interval of check metrics is reach the thresholds
+ Trigger TriggerConfig `mapstructure:"trigger"`
+}
- TaskConfig *base.TaskConfig `mapstructure:"task"` // Profiling task config
+type TriggerConfig struct {
+ ExecuteDuration string `mapstructure:"execute_duration"` // the duration of the profiling task
+ SilenceDuration string `mapstructure:"silence_duration"` // the duration between the execution of the same profiling task.
}
diff --git a/pkg/profiling/continuous/base/metrics.go b/pkg/profiling/continuous/base/metrics.go
new file mode 100644
index 0000000..6b48015
--- /dev/null
+++ b/pkg/profiling/continuous/base/metrics.go
@@ -0,0 +1,104 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+)
+
+type MetricsAppender struct {
+ prefix string
+ timestamp int64
+ meters map[serviceInstanceMetadata][]*v3.MeterData // split by service instance
+}
+
+func NewMetricsAppender(prefix string) *MetricsAppender {
+ return &MetricsAppender{
+ prefix: prefix,
+ timestamp: time.Now().UnixMilli(),
+ meters: make(map[serviceInstanceMetadata][]*v3.MeterData),
+ }
+}
+
+func (m *MetricsAppender) AppendProcessSingleValue(name string, p api.ProcessInterface, labels map[string]string, value float64) {
+ transformLabels := make([]*v3.Label, 0)
+ for k, v := range labels {
+ transformLabels = append(transformLabels, &v3.Label{Name: k, Value: v})
+ }
+ transformLabels = append(transformLabels, &v3.Label{Name: "process_name", Value: p.Entity().ProcessName})
+ metadata := serviceInstanceMetadata{
+ service: p.Entity().ServiceName,
+ instance: p.Entity().InstanceName,
+ }
+ m.meters[metadata] = append(m.meters[metadata], &v3.MeterData{
+ Service: p.Entity().ServiceName,
+ ServiceInstance: p.Entity().InstanceName,
+ Timestamp: m.timestamp,
+ Metric: &v3.MeterData_SingleValue{
+ SingleValue: &v3.MeterSingleValue{
+ Name: m.formatName(name),
+ Labels: transformLabels,
+ Value: value,
+ },
+ },
+ })
+}
+
+func (m *MetricsAppender) Flush(ctx context.Context, client v3.MeterReportServiceClient) error {
+ if len(m.meters) == 0 {
+ return nil
+ }
+
+ batch, err := client.CollectBatch(ctx)
+ if err != nil {
+ return err
+ }
+
+ for _, meters := range m.meters {
+ collection := &v3.MeterDataCollection{
+ MeterData: meters,
+ }
+ if e := batch.Send(collection); e != nil {
+ _ = m.closeSteam(batch)
+ return e
+ }
+ }
+ return m.closeSteam(batch)
+}
+
+func (m *MetricsAppender) closeSteam(batch v3.MeterReportService_CollectBatchClient) error {
+ _, err := batch.CloseAndRecv()
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+type serviceInstanceMetadata struct {
+ service string
+ instance string
+}
+
+func (m *MetricsAppender) formatName(name string) string {
+ return m.prefix + "_" + name
+}
diff --git a/pkg/profiling/continuous/base/policy.go b/pkg/profiling/continuous/base/policy.go
new file mode 100644
index 0000000..9868590
--- /dev/null
+++ b/pkg/profiling/continuous/base/policy.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "reflect"
+ "regexp"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+)
+
+type Policy struct {
+ TargetProfilingType TargetProfilingType
+ Items map[CheckType]*PolicyItem
+
+ ServicePolicy *ServicePolicy
+}
+
+type PolicyItem struct {
+ Threshold string
+ Period int
+ Count int
+ URIList []string
+ URIRegex string
+
+ Policy *Policy
+}
+
+func (p *PolicyItem) SameURIFilter(other *PolicyItem) bool {
+ if p.URIRegex != "" && p.URIRegex == other.URIRegex {
+ return true
+ } else if len(p.URIList) > 0 && reflect.DeepEqual(p.URIList, other.URIList) {
+ return true
+ } else if p.URIRegex == "" && other.URIRegex == "" && len(p.URIList) == 0 && len(other.URIList) == 0 {
+ return true
+ }
+ return false
+}
+
+type ServicePolicy struct {
+ Service string
+ UUID string
+
+ Policies []*Policy
+ Processes map[string]api.ProcessInterface
+}
+
+type URIChecker struct {
+ FixedURIList []string
+ URIRegexList []*regexp.Regexp
+}
+
+type URIResponseDurationTimeoutRate struct {
+ Timeout int // Define how long(ms) is considered a timeout
+ Rate int // Response timeout rate
+}
+
+type TargetProfilingType string
+
+const (
+ TargetProfilingTypeOnCPU TargetProfilingType = "ON_CPU"
+ TargetProfilingTypeOffCPU TargetProfilingType = "OFF_CPU"
+ TargetProfilingTypeNetwork TargetProfilingType = "NETWORK"
+)
diff --git a/pkg/profiling/continuous/base/trigger.go b/pkg/profiling/continuous/base/trigger.go
new file mode 100644
index 0000000..0ae3f0d
--- /dev/null
+++ b/pkg/profiling/continuous/base/trigger.go
@@ -0,0 +1,42 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/task"
+ taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type TriggerReporter interface {
+ ReportProcesses(causeProcess api.ProcessInterface, profilingProcesses []api.ProcessInterface, cases []ThresholdCause,
+ taskSetter func(task *taskBase.ProfilingTask),
+ reportSetter func(report *v3.ContinuousProfilingReport)) (*task.Context, error)
+}
+
+type Trigger interface {
+ // Init trigger
+ Init(moduleMgr *module.Manager, conf *ContinuousConfig) error
+ // ShouldTrigger validate the process should be trigger task
+ ShouldTrigger(p api.ProcessInterface) bool
+ // TriggerTasks generate task and execute that policy could be trigger
+ TriggerTasks(reporter TriggerReporter, causes []ThresholdCause) int
+}
diff --git a/pkg/profiling/continuous/base/windows.go b/pkg/profiling/continuous/base/windows.go
new file mode 100644
index 0000000..c400a64
--- /dev/null
+++ b/pkg/profiling/continuous/base/windows.go
@@ -0,0 +1,249 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "container/list"
+ "sync"
+ "time"
+)
+
+// WindowData the slot data under TimeWindows
+type WindowData[D any, R any] interface {
+ // Reset the data content
+ Reset()
+ // Accept add data
+ Accept(data D)
+ // Get calculate the result
+ Get() R
+}
+
+// LatestWindowData only save the last data in one slot
+type LatestWindowData[D comparable] struct {
+ Value D
+}
+
+func NewLatestWindowData[D comparable]() WindowData[D, D] {
+ return &LatestWindowData[D]{}
+}
+
+func (t *LatestWindowData[D]) Reset() {
+ var empty D
+ t.Value = empty
+}
+
+func (t *LatestWindowData[D]) Accept(data D) {
+ t.Value = data
+}
+
+func (t *LatestWindowData[D]) Get() D {
+ return t.Value
+}
+
+type TimeWindows[V any, R any] struct {
+ data *list.List
+ endTime *time.Time
+ windowLocker sync.RWMutex
+ windowGenerator func() WindowData[V, R]
+
+ lastFlushedElement *list.Element
+ lastWriteElement *list.Element
+}
+
+func NewTimeWindows[V any, R any](items []*PolicyItem, generator func() WindowData[V, R]) *TimeWindows[V, R] {
+ var maxPeriod int
+ for _, i := range items {
+ if i.Period > maxPeriod {
+ maxPeriod = i.Period
+ }
+ }
+ generatorWrapper := func() WindowData[V, R] {
+ return newWindowDataWrapper[V, R](generator)
+ }
+
+ window := &TimeWindows[V, R]{
+ data: list.New(),
+ windowGenerator: generatorWrapper,
+ }
+
+ for i := 0; i < maxPeriod; i++ {
+ window.data.PushFront(generatorWrapper())
+ }
+ return window
+}
+
+func (t *TimeWindows[V, R]) MatchRule(policy *PolicyItem, check func(slot R) bool) (lastMatch R, isMatch bool) {
+ t.windowLocker.RLock()
+ defer t.windowLocker.RUnlock()
+
+ needsCount := policy.Count
+ matchedCount := 0
+ for e := t.data.Back(); e != nil; e = e.Prev() {
+ getVal := e.Value.(*windowDataWrapper[V, R]).Get()
+ if check(getVal) {
+ matchedCount++
+ lastMatch = getVal
+ }
+ }
+ return lastMatch, matchedCount >= needsCount
+}
+
+func (t *TimeWindows[V, R]) ScalePeriod(items []*PolicyItem) {
+ var maxPeriod int
+ for _, i := range items {
+ if i.Period > maxPeriod {
+ maxPeriod = i.Period
+ }
+ }
+
+ t.windowLocker.Lock()
+ defer t.windowLocker.Unlock()
+ if t.data.Len() == maxPeriod {
+ return
+ }
+
+ val := maxPeriod - t.data.Len()
+ if val > 0 {
+ // need scale up
+ for i := 0; i < val; i++ {
+ t.data.PushBack(t.windowGenerator())
+ }
+ } else {
+ // need to scale down
+ val = -val
+ for i := 0; i < val; i++ {
+ t.data.Remove(t.data.Back())
+ }
+ }
+}
+
+func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
+ if t.endTime == nil {
+ t.endTime = &tm
+ }
+
+ second := int(t.endTime.Sub(tm).Seconds())
+ if second < 0 {
+ t.moveTo(tm)
+ second = 0
+ }
+
+ if second > t.data.Len() {
+ // add the older data, ignore it
+ return
+ }
+
+ t.appendDataToSlot(t.data.Len()-second-1, val)
+}
+
+func (t *TimeWindows[D, R]) FlushLastWriteData() (R, bool) {
+ if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+ var empty R
+ return empty, false
+ }
+ t.lastFlushedElement = t.lastWriteElement
+ return t.lastFlushedElement.Value.(*windowDataWrapper[D, R]).Get(), true
+}
+
+func (t *TimeWindows[D, R]) FlushMultipleWriteData() ([]R, bool) {
+ result := make([]R, 0)
+ if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+ return nil, false
+ }
+ for e := t.lastWriteElement; e != t.lastFlushedElement && e != nil; e = e.Prev() {
+ if e.Value.(*windowDataWrapper[D, R]).hasData {
+ result = append(result, e.Value.(*windowDataWrapper[D, R]).Get())
+ }
+ }
+ t.lastFlushedElement = t.lastWriteElement
+ return result, true
+}
+
+func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
+ t.windowLocker.Lock()
+ defer t.windowLocker.Unlock()
+
+ addSeconds := int(tm.Sub(*t.endTime).Seconds())
+ if addSeconds <= 0 {
+ // same second or older
+ return
+ } else if addSeconds > t.data.Len() {
+ // out of second count
+ for e := t.data.Front(); e != nil; e = e.Next() {
+ e.Value.(*windowDataWrapper[D, R]).Reset()
+ }
+ } else {
+ for i := 0; i < addSeconds; i++ {
+ // remove the older data
+ first := t.data.Remove(t.data.Back()).(*windowDataWrapper[D, R])
+ first.Reset()
+ t.data.PushFront(first)
+ }
+ }
+ t.endTime = &tm
+}
+
+func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) {
+ t.windowLocker.RLock()
+ defer t.windowLocker.RUnlock()
+
+ if index <= 0 || index >= t.data.Len() {
+ return
+ }
+
+ dataLen := t.data.Len()
+ var element *list.Element
+ if index < (dataLen >> 1) {
+ d := t.data.Front()
+ for i := 0; i < index; i++ {
+ d = d.Next()
+ }
+ element = d
+ } else {
+ d := t.data.Back()
+ for i := dataLen - 1; i > index; i-- {
+ d = d.Prev()
+ }
+ element = d
+ }
+
+ element.Value.(*windowDataWrapper[V, R]).Accept(data)
+ t.lastWriteElement = element
+}
+
+type windowDataWrapper[D any, R any] struct {
+ WindowData[D, R]
+ hasData bool
+}
+
+func newWindowDataWrapper[D any, R any](generator func() WindowData[D, R]) *windowDataWrapper[D, R] {
+ return &windowDataWrapper[D, R]{
+ WindowData: generator(),
+ hasData: false,
+ }
+}
+
+func (t *windowDataWrapper[D, R]) Reset() {
+ t.hasData = false
+ t.WindowData.Reset()
+}
+
+func (t *windowDataWrapper[D, R]) Accept(data D) {
+ t.hasData = true
+ t.WindowData.Accept(data)
+}
diff --git a/pkg/profiling/continuous/checker/bpf/network/http1.go b/pkg/profiling/continuous/checker/bpf/network/http1.go
new file mode 100644
index 0000000..77ba21e
--- /dev/null
+++ b/pkg/profiling/continuous/checker/bpf/network/http1.go
@@ -0,0 +1,135 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package network
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/tools/host"
+)
+
+type HTTP1BufferEvent struct {
+ pid uint32
+ requestURI string
+ requestTime uint64
+ statusCode int
+ responseTime uint64
+}
+
+type HTTP1Analyzer struct {
+ channelEvents map[uint64]*HTTP1BufferEvent
+}
+
+func NewHTTP1Analyzer() *HTTP1Analyzer {
+ return &HTTP1Analyzer{
+ channelEvents: make(map[uint64]*HTTP1BufferEvent),
+ }
+}
+
+func (h *HTTP1Analyzer) HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent {
+ if buffer.Direction == BufferDirectionRequest {
+ event := &HTTP1BufferEvent{}
+ event.pid = buffer.Pid
+ event.requestTime = buffer.Timestamp
+ uri, err := h.analyzeRequestURI(buffer)
+ if err != nil {
+ log.Warnf("cannot fount the request uri from content: %s, reason: %v", buffer.Buffer[0:buffer.Size], err)
+ return nil
+ }
+ event.requestURI = uri
+ h.channelEvents[buffer.ChannelRef] = event
+ return nil
+ }
+
+ event := h.channelEvents[buffer.ChannelRef]
+
+ // cannot found the last request event
+ if event == nil {
+ return nil
+ }
+ // clean the request buffer
+ h.channelEvents[buffer.ChannelRef] = nil
+
+ code, err := h.analyzeResponseStatus(buffer)
+ if err != nil {
+ log.Warnf("failure to parse the response status code: content: %s, reason: %v", buffer.Buffer[0:buffer.Size], err)
+ return nil
+ }
+ event.statusCode = code
+ event.responseTime = buffer.Timestamp
+ return event
+}
+
+func (h *HTTP1Analyzer) analyzeRequestURI(buffer *networkBufferInBPF) (string, error) {
+ bufferData := string(buffer.Buffer[0:buffer.Size])
+ firstSpace := strings.Index(bufferData, " ")
+ if firstSpace <= 0 {
+ return "", fmt.Errorf("the reuquest buffer is not validate")
+ }
+ if len(bufferData) <= firstSpace+1 {
+ return "", fmt.Errorf("current package data data is not enough")
+ }
+ requestURIData := bufferData[firstSpace+1:]
+ for inx, d := range requestURIData {
+ // find the first requestURI end index
+ if d == '?' || d == ' ' {
+ return requestURIData[0:inx], nil
+ }
+ }
+ return "", fmt.Errorf("cannot found the request URI")
+}
+
+func (h *HTTP1Analyzer) analyzeResponseStatus(buffer *networkBufferInBPF) (int, error) {
+ bufferData := string(buffer.Buffer[0:buffer.Size])
+ firstSpace := strings.Index(bufferData, " ")
+ if firstSpace <= 0 {
+ return 0, fmt.Errorf("the reuquest buffer is not validate")
+ }
+ if len(bufferData) <= firstSpace+1 {
+ return 0, fmt.Errorf("current package data data is not enough")
+ }
+ secondSpace := strings.Index(bufferData[firstSpace+1:], " ")
+ i, err := strconv.ParseInt(bufferData[firstSpace+1:][0:secondSpace], 10, 32)
+ if err != nil {
+ return 0, fmt.Errorf("parse response status error")
+ }
+ return int(i), nil
+}
+
+func (h *HTTP1BufferEvent) StartTime() time.Time {
+ return host.Time(h.requestTime)
+}
+
+func (h *HTTP1BufferEvent) Pid() int32 {
+ return int32(h.pid)
+}
+
+func (h *HTTP1BufferEvent) RequestURI() string {
+ return h.requestURI
+}
+
+func (h *HTTP1BufferEvent) IsResponseError() bool {
+ return h.statusCode >= 500
+}
+
+func (h *HTTP1BufferEvent) Duration() time.Duration {
+ return time.Duration(h.responseTime - h.requestTime)
+}
diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go b/pkg/profiling/continuous/checker/bpf/network/network.go
new file mode 100644
index 0000000..4bcda76
--- /dev/null
+++ b/pkg/profiling/continuous/checker/bpf/network/network.go
@@ -0,0 +1,187 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package network
+
+import (
+ "sync"
+ "time"
+
+ "github.com/hashicorp/go-multierror"
+
+ "github.com/cilium/ebpf"
+ "github.com/cilium/ebpf/link"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
+)
+
+// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
+// nolint
+//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/continuous/network.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
+
+var log = logger.GetLogger("profiling", "continuous", "checker", "network", "bpf")
+
+var locker sync.Mutex
+var bpf *bpfObjects
+var bpfLinker *btf.Linker
+var monitoringProcesses map[int32]map[string]bool
+var notifiers []EventNotify
+
+func init() {
+ monitoringProcesses = make(map[int32]map[string]bool)
+}
+
+type BufferEvent interface {
+ Pid() int32
+ RequestURI() string
+ IsResponseError() bool
+ Duration() time.Duration
+ StartTime() time.Time
+}
+
+type EventNotify interface {
+ ReceiveBufferEvent(event BufferEvent)
+}
+
+func AddEventNotify(notify EventNotify) {
+ notifiers = append(notifiers, notify)
+}
+
+// AddWatchProcess add the process to monitoring with from source
+// BPF would be start automatically
+func AddWatchProcess(pid int32, from string) error {
+ if e := startBPFIfNeed(); e != nil {
+ return e
+ }
+
+ // first, update to the monitor control
+ if e := bpf.ProcessMonitorControl.Update(uint32(pid), uint32(1), ebpf.UpdateAny); e != nil {
+ // if add failure, then check the BPF should be shutdown or not
+ _ = shutdownBPFIfNoProcesses()
+ return e
+ }
+
+ // then, add to the monitoring cache
+ monitoring := monitoringProcesses[pid]
+ if monitoring == nil {
+ monitoring = make(map[string]bool)
+ monitoringProcesses[pid] = monitoring
+ }
+
+ monitoring[from] = true
+ return nil
+}
+
+// RemoveWatchProcess remove the process to monitoring with from source
+// if no process is monitoring, the BPF would be stopped
+func RemoveWatchProcess(pid int32, from string) error {
+ defer func() {
+ if err := shutdownBPFIfNoProcesses(); err != nil {
+ log.Warnf("shutdown the BPF failure: %v", err)
+ }
+ }()
+ monitoring := monitoringProcesses[pid]
+ if monitoring == nil {
+ return nil
+ }
+
+ delete(monitoring, from)
+ shouldRemoveMonitor := false
+ if len(monitoringProcesses[pid]) == 0 {
+ delete(monitoringProcesses, pid)
+ shouldRemoveMonitor = true
+ }
+
+ if shouldRemoveMonitor {
+ if err := bpf.ProcessMonitorControl.Delete(uint32(pid)); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func ForceShutdownBPF() error {
+ return shutdownBPF()
+}
+
+// start the BPF program if contains process that needs monitor
+func startBPFIfNeed() error {
+ locker.Lock()
+ defer locker.Unlock()
+
+ if bpf != nil {
+ return nil
+ }
+
+ bpf = &bpfObjects{}
+ if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed()); err != nil {
+ return err
+ }
+ bpfLinker = btf.NewLinker()
+ bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg": bpf.TcpSendmsg})
+ bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg": bpf.TcpRecvmsg})
+ bpfLinker.AddLink(link.Kretprobe, map[string]*ebpf.Program{"tcp_recvmsg": bpf.RetTcpRecvmsg})
+
+ reader := newNetworkBufferReader(func(event BufferEvent) {
+ for _, n := range notifiers {
+ n.ReceiveBufferEvent(event)
+ }
+ })
+ bpfLinker.ReadEventAsync(bpf.SocketBufferSendQueue, reader.Read, reader.BufferDataBPFSupplier)
+
+ if err := bpfLinker.HasError(); err != nil {
+ _ = bpfLinker.Close()
+ _ = bpf.Close()
+ bpf = nil
+ bpfLinker = nil
+ return err
+ }
+ return nil
+}
+
+// shutdown the BPF program if we don't contain any process need to be monitored
+func shutdownBPFIfNoProcesses() error {
+ // if still contains monitoring process then the BPF should not be stopped
+ if len(monitoringProcesses) != 0 {
+ return nil
+ }
+ return shutdownBPF()
+}
+
+func shutdownBPF() error {
+ locker.Lock()
+ defer locker.Unlock()
+
+ if bpf == nil {
+ return nil
+ }
+ // if still contains monitoring process then the BPF should not be stopped
+ if len(monitoringProcesses) != 0 {
+ return nil
+ }
+
+ var err error
+ if e := bpfLinker.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ if e := bpf.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ bpf = nil
+ return err
+}
diff --git a/pkg/profiling/continuous/checker/bpf/network/reader.go b/pkg/profiling/continuous/checker/bpf/network/reader.go
new file mode 100644
index 0000000..7530e5c
--- /dev/null
+++ b/pkg/profiling/continuous/checker/bpf/network/reader.go
@@ -0,0 +1,77 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package network
+
+import (
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+)
+
+type BufferDirection uint8
+
+var protocolAnalyzers map[base.ConnectionProtocol]ProtocolAnalyzer
+
+var (
+ BufferDirectionRequest BufferDirection = 1
+ BufferDirectionResponse = 2
+)
+
+func init() {
+ protocolAnalyzers = make(map[base.ConnectionProtocol]ProtocolAnalyzer)
+ protocolAnalyzers[base.ConnectionProtocolHTTP] = NewHTTP1Analyzer()
+}
+
+type networkBufferInBPF struct {
+ Timestamp uint64
+ ChannelRef uint64
+ Protocol base.ConnectionProtocol
+ Direction BufferDirection
+ Size uint16
+ Pid uint32
+ Buffer [159]byte
+}
+
+type networkBufferReader struct {
+ notifyFunc func(event BufferEvent)
+}
+
+func newNetworkBufferReader(notify func(event BufferEvent)) *networkBufferReader {
+ return &networkBufferReader{
+ notifyFunc: notify,
+ }
+}
+
+func (n *networkBufferReader) Read(data interface{}) {
+ buffer := data.(*networkBufferInBPF)
+ analyzer := protocolAnalyzers[buffer.Protocol]
+ if analyzer == nil {
+ return
+ }
+
+ event := analyzer.HandleBufferEvent(buffer)
+ if event != nil {
+ n.notifyFunc(event)
+ }
+}
+
+func (n *networkBufferReader) BufferDataBPFSupplier() interface{} {
+ return &networkBufferInBPF{}
+}
+
+type ProtocolAnalyzer interface {
+ HandleBufferEvent(buffer *networkBufferInBPF) BufferEvent
+}
diff --git a/pkg/profiling/continuous/checker/common/causes.go b/pkg/profiling/continuous/checker/common/causes.go
new file mode 100644
index 0000000..8955569
--- /dev/null
+++ b/pkg/profiling/continuous/checker/common/causes.go
@@ -0,0 +1,116 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type SingleValueCause struct {
+ process api.ProcessInterface
+ policy *base.PolicyItem
+ causeType v3.ContinuousProfilingCauseType
+ threshold, current float64
+}
+
+func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
+ threshold, current float64) *SingleValueCause {
+ return &SingleValueCause{
+ process: p,
+ policy: policyItem,
+ causeType: causeType,
+ threshold: threshold,
+ current: current,
+ }
+}
+
+func (p *SingleValueCause) Process() api.ProcessInterface {
+ return p.process
+}
+
+func (p *SingleValueCause) FromPolicy() *base.PolicyItem {
+ return p.policy
+}
+
+func (p *SingleValueCause) GenerateTransferCause() *v3.ContinuousProfilingCause {
+ return &v3.ContinuousProfilingCause{
+ Type: p.causeType,
+ Cause: &v3.ContinuousProfilingCause_SingleValue{
+ SingleValue: &v3.ContinuousProfilingSingleValueCause{
+ Threshold: p.threshold,
+ Current: p.current,
+ },
+ },
+ }
+}
+
+type URICause struct {
+ IsRegex bool
+ URI string
+
+ process api.ProcessInterface
+ policy *base.PolicyItem
+ causeType v3.ContinuousProfilingCauseType
+ threshold, current float64
+}
+
+func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
+ threshold, current float64) *URICause {
+ return &URICause{
+ process: p,
+ policy: policyItem,
+ causeType: causeType,
+ IsRegex: isRegex,
+ URI: uri,
+ threshold: threshold,
+ current: current,
+ }
+}
+
+func (p *URICause) Process() api.ProcessInterface {
+ return p.process
+}
+
+func (p *URICause) FromPolicy() *base.PolicyItem {
+ return p.policy
+}
+
+func (p *URICause) GenerateTransferCause() *v3.ContinuousProfilingCause {
+ uriCause := &v3.ContinuousProfilingURICause{
+ Threshold: p.threshold,
+ Current: p.current,
+ }
+ if p.IsRegex {
+ uriCause.Uri = &v3.ContinuousProfilingURICause_Regex{
+ Regex: p.URI,
+ }
+ } else {
+ uriCause.Uri = &v3.ContinuousProfilingURICause_Path{
+ Path: p.URI,
+ }
+ }
+ return &v3.ContinuousProfilingCause{
+ Type: p.causeType,
+ Cause: &v3.ContinuousProfilingCause_Uri{
+ Uri: uriCause,
+ },
+ }
+}
diff --git a/pkg/profiling/continuous/checker/common/checker.go b/pkg/profiling/continuous/checker/common/checker.go
new file mode 100644
index 0000000..1993f2e
--- /dev/null
+++ b/pkg/profiling/continuous/checker/common/checker.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+
+ "reflect"
+)
+
+var log = logger.GetLogger("profiling", "continuous", "checker", "common")
+
+type ProcessInfo interface {
+}
+
+type numbers interface {
+ int | int32 | int64 | float64
+}
+
+// BaseChecker contains the process with them self info
+// Key: process pid
+// Value: the process information, usually contains the time windows and policy items
+type BaseChecker[Info ProcessInfo] struct {
+ InfoGenerator func(p api.ProcessInterface, older Info, items []*base.PolicyItem) Info
+ PidWithInfos map[int32]Info
+}
+
+func NewBaseChecker[Info ProcessInfo](generator func(p api.ProcessInterface, older Info, items []*base.PolicyItem) Info) *BaseChecker[Info] {
+ return &BaseChecker[Info]{
+ InfoGenerator: generator,
+ PidWithInfos: make(map[int32]Info),
+ }
+}
+
+func (c *BaseChecker[Info]) SyncPolicies(policies []*base.SyncPolicyWithProcesses,
+ getEnabledItem func(items map[base.CheckType]*base.PolicyItem) *base.PolicyItem,
+ notify func(key int32, isDelete bool)) {
+ pidWithPolicyItems := make(map[api.ProcessInterface][]*base.PolicyItem)
+ for _, processWithPolicies := range policies {
+ item := getEnabledItem(processWithPolicies.Policy.Items)
+ if item == nil {
+ continue
+ }
+
+ // create or get the existing windows
+ // and add the policy into the update
+ for _, p := range processWithPolicies.Processes {
+ pidWithPolicyItems[p] = append(pidWithPolicyItems[p], item)
+ }
+ }
+
+ // generate the new process info
+ result := make(map[int32]Info)
+ for p, items := range pidWithPolicyItems {
+ pid := p.Pid()
+ if existing := c.PidWithInfos[pid]; reflect.ValueOf(existing).IsZero() && notify != nil {
+ notify(pid, false)
+ }
+ result[pid] = c.InfoGenerator(p, c.PidWithInfos[pid], items)
+ }
+
+ // if the pid is not exist in the new policies, then notify to delete
+ for key := range c.PidWithInfos {
+ if newResult := result[key]; reflect.ValueOf(newResult).IsZero() && notify != nil {
+ notify(key, true)
+ }
+ }
+
+ c.PidWithInfos = result
+}
diff --git a/pkg/profiling/continuous/checker/common/http_checker.go b/pkg/profiling/continuous/checker/common/http_checker.go
new file mode 100644
index 0000000..383f5ca
--- /dev/null
+++ b/pkg/profiling/continuous/checker/common/http_checker.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "regexp"
+ "strings"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/bpf/network"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type HTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]] struct {
+ *BaseChecker[*HTTPBasedCheckerProcessInfo]
+
+ CheckType base.CheckType
+ CauseType v3.ContinuousProfilingCauseType
+ ThresholdGenerate func(val string) (float64, error)
+}
+
+func NewHTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]](checkType base.CheckType,
+ thresholdGenerator func(val string) (float64, error), dataGenerator func() base.WindowData[network.BufferEvent, float64],
+ causeType v3.ContinuousProfilingCauseType) *HTTPBasedChecker[Data] {
+ checker := &HTTPBasedChecker[Data]{
+ CheckType: checkType,
+ ThresholdGenerate: thresholdGenerator,
+ CauseType: causeType,
+ }
+ checker.BaseChecker = NewBaseChecker[*HTTPBasedCheckerProcessInfo](
+ func(p api.ProcessInterface, older *HTTPBasedCheckerProcessInfo, items []*base.PolicyItem) *HTTPBasedCheckerProcessInfo {
+ result := &HTTPBasedCheckerProcessInfo{
+ Process: p,
+ PolicyWithWindows: make(map[*base.PolicyItem]*HTTPBasedCheckerPolicyItemWindows),
+ }
+ for _, item := range items {
+ val, _ := thresholdGenerator(item.Threshold)
+ policyInfo := &HTTPBasedCheckerPolicyItemWindows{
+ threshold: val,
+ }
+
+ timeWindowsUpdated := false
+ if older != nil {
+ for olderItem, olderInfo := range older.PolicyWithWindows {
+ // reading from the older policy info
+ if olderItem.SameURIFilter(item) {
+ if len(item.URIList) > 0 {
+ for _, w := range olderInfo.uriWithTimeWindows {
+ w.ScalePeriod([]*base.PolicyItem{item})
+ }
+
+ policyInfo.uriWithTimeWindows = olderInfo.uriWithTimeWindows
+ } else {
+ policyInfo.defaultTimeWindows = olderInfo.defaultTimeWindows
+ policyInfo.defaultTimeWindows.ScalePeriod([]*base.PolicyItem{item})
+ }
+ timeWindowsUpdated = true
+ break
+ }
+ }
+ }
+
+ if timeWindowsUpdated {
+ result.PolicyWithWindows[item] = policyInfo
+ continue
+ }
+
+ // otherwise, create the time windows
+ if len(item.URIList) > 0 {
+ uriWithWindows := make(map[string]*base.TimeWindows[network.BufferEvent, float64])
+ for _, uri := range item.URIList {
+ uriWithWindows[uri] = base.NewTimeWindows[network.BufferEvent, float64](
+ []*base.PolicyItem{item}, func() base.WindowData[network.BufferEvent, float64] {
+ return dataGenerator()
+ })
+ }
+ policyInfo.uriWithTimeWindows = uriWithWindows
+ } else if item.URIRegex != "" {
+ regex, err := regexp.Compile(item.URIRegex)
+ if err != nil {
+ log.Warnf("error to compile the URI regex for policy, ignore this policy. regex: %s", item.URIRegex)
+ continue
+ }
+ policyInfo.uriRegex = regex
+ }
+ policyInfo.defaultTimeWindows = base.NewTimeWindows[network.BufferEvent, float64](
+ []*base.PolicyItem{item}, func() base.WindowData[network.BufferEvent, float64] {
+ return dataGenerator()
+ })
+
+ result.PolicyWithWindows[item] = policyInfo
+ }
+ return result
+ })
+ network.AddEventNotify(checker)
+ return checker
+}
+
+func (n *HTTPBasedChecker[Data]) SyncPolicies(policies []*base.SyncPolicyWithProcesses) {
+ n.BaseChecker.SyncPolicies(policies, func(items map[base.CheckType]*base.PolicyItem) *base.PolicyItem {
+ item := items[n.CheckType]
+ if item == nil {
+ return nil
+ }
+ _, err := n.ThresholdGenerate(item.Threshold)
+ if err != nil {
+ log.Warnf("failure to parse the %s threshold to int: %v", n.CheckType, item.Threshold)
+ return nil
+ }
+ return item
+ }, func(pid int32, isDelete bool) {
+ // notify to the listener
+ var err error
+ defer func() {
+ if err != nil {
+ log.Warnf("process the pid monitoring failure, pid: %d, is delete: %t, erro: %v", pid, isDelete, err)
+ }
+ }()
+ if isDelete {
+ err = network.RemoveWatchProcess(pid, string(n.CheckType))
+ return
+ }
+
+ err = network.AddWatchProcess(pid, string(n.CheckType))
+ })
+}
+
+func (n *HTTPBasedChecker[Data]) ReceiveBufferEvent(event network.BufferEvent) {
+ info := n.PidWithInfos[event.Pid()]
+ if info == nil {
+ return
+ }
+
+ for _, policyInfo := range info.PolicyWithWindows {
+ var matchesWindows *base.TimeWindows[network.BufferEvent, float64]
+ // match with the regex or URI list
+ if len(policyInfo.uriWithTimeWindows) > 0 {
+ for uri, windows := range policyInfo.uriWithTimeWindows {
+ if event.RequestURI() == uri {
+ matchesWindows = windows
+ }
+ }
+ if matchesWindows == nil {
+ continue
+ }
+ } else if policyInfo.uriRegex != nil && !policyInfo.uriRegex.MatchString(event.RequestURI()) {
+ continue
+ }
+
+ if matchesWindows == nil {
+ matchesWindows = policyInfo.defaultTimeWindows
+ }
+
+ matchesWindows.Add(event.StartTime(), event)
+ }
+}
+
+func (n *HTTPBasedChecker[Data]) Fetch() error {
+ return nil
+}
+
+func (n *HTTPBasedChecker[Data]) Close() error {
+ return network.ForceShutdownBPF()
+}
+
+func (n *HTTPBasedChecker[Data]) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause {
+ causes := make([]base.ThresholdCause, 0)
+ for _, pidPolicies := range n.PidWithInfos {
+ for item, itemInfo := range pidPolicies.PolicyWithWindows {
+ globalURI := ""
+ if itemInfo.uriRegex != nil {
+ globalURI = itemInfo.uriRegex.String()
+ }
+ for uri, windows := range itemInfo.uriWithTimeWindows {
+ n.flushMetrics(uri, windows, pidPolicies.Process, metricsAppender)
+ }
+ if itemInfo.defaultTimeWindows != nil {
+ n.flushMetrics(globalURI, itemInfo.defaultTimeWindows, pidPolicies.Process, metricsAppender)
+ }
+ if !ctx.ShouldCheck(pidPolicies.Process, item) {
+ continue
+ }
+
+ // url list checker
+ for uri, window := range itemInfo.uriWithTimeWindows {
+ if lastMatch, isMatch := window.MatchRule(item, func(val float64) bool {
+ return val >= itemInfo.threshold
+ }); isMatch {
+ causes = append(causes, NewURICause(pidPolicies.Process, false, uri, item,
+ n.CauseType, itemInfo.threshold, lastMatch))
+ }
+ }
+
+ // regex or global
+ if lastMatch, isMatch := itemInfo.defaultTimeWindows.MatchRule(item, func(val float64) bool {
+ return val >= itemInfo.threshold
+ }); isMatch {
+ causes = append(causes, NewURICause(pidPolicies.Process, itemInfo.uriRegex != nil, globalURI, item,
+ n.CauseType, itemInfo.threshold, lastMatch))
+ }
+ }
+ }
+ return causes
+}
+
+func (n *HTTPBasedChecker[Data]) flushMetrics(uri string, windows *base.TimeWindows[network.BufferEvent, float64],
+ process api.ProcessInterface, metricsAppender *base.MetricsAppender) {
+ if uri == "" {
+ uri = "global"
+ }
+ if data, hasUpdate := windows.FlushMultipleWriteData(); hasUpdate {
+ // flush each slot data
+ for _, d := range data {
+ metricsAppender.AppendProcessSingleValue(strings.ToLower(string(n.CheckType)), process, map[string]string{
+ "uri": uri,
+ }, d)
+ }
+ }
+}
+
+type HTTPBasedCheckerProcessInfo struct {
+ Process api.ProcessInterface
+ PolicyWithWindows map[*base.PolicyItem]*HTTPBasedCheckerPolicyItemWindows
+}
+
+type HTTPBasedCheckerPolicyItemWindows struct {
+ uriWithTimeWindows map[string]*base.TimeWindows[network.BufferEvent, float64]
+
+ uriRegex *regexp.Regexp
+ defaultTimeWindows *base.TimeWindows[network.BufferEvent, float64]
+
+ threshold float64
+}
diff --git a/pkg/profiling/continuous/checker/common/process_checker.go b/pkg/profiling/continuous/checker/common/process_checker.go
new file mode 100644
index 0000000..ad1f42e
--- /dev/null
+++ b/pkg/profiling/continuous/checker/common/process_checker.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/hashicorp/go-multierror"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type ProcessBasedChecker[V numbers] struct {
+ *BaseChecker[*ProcessBasedInfo[V]]
+
+ CheckType base.CheckType
+ CauseType v3.ContinuousProfilingCauseType
+ ThresholdGenerate func(val string) (V, error)
+ DataGenerate func(process api.ProcessInterface) (V, error)
+}
+
+func NewProcessBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
+ dataGenerator func(p api.ProcessInterface) (V, error), causeType v3.ContinuousProfilingCauseType) *ProcessBasedChecker[V] {
+ checker := &ProcessBasedChecker[V]{
+ CheckType: checkType,
+ CauseType: causeType,
+ ThresholdGenerate: thresholdGenerator,
+ DataGenerate: dataGenerator,
+ }
+ checker.BaseChecker = NewBaseChecker[*ProcessBasedInfo[V]](
+ func(p api.ProcessInterface, older *ProcessBasedInfo[V], items []*base.PolicyItem) *ProcessBasedInfo[V] {
+ var win *base.TimeWindows[V, V]
+ if older != nil {
+ win = older.Windows
+ older.Windows.ScalePeriod(items)
+ } else {
+ win = base.NewTimeWindows[V, V](items, func() base.WindowData[V, V] {
+ return base.NewLatestWindowData[V]()
+ })
+ }
+ policies := make([]*ProcessBasedPolicy[V], 0)
+ for _, i := range items {
+ threshold, _ := thresholdGenerator(i.Threshold)
+ policies = append(policies, &ProcessBasedPolicy[V]{
+ Threshold: threshold,
+ Policy: i,
+ })
+ }
+ return &ProcessBasedInfo[V]{
+ Process: p,
+ Windows: win,
+ Policies: policies,
+ }
+ })
+ return checker
+}
+
+func (r *ProcessBasedChecker[V]) SyncPolicies(policies []*base.SyncPolicyWithProcesses) {
+ r.BaseChecker.SyncPolicies(policies, func(items map[base.CheckType]*base.PolicyItem) *base.PolicyItem {
+ item := items[r.CheckType]
+ if item == nil {
+ return nil
+ }
+ _, err := r.ThresholdGenerate(item.Threshold)
+ if err != nil {
+ log.Warnf("failure to parse the %s threshold: %v, error: %v", r.CheckType, item.Threshold, err)
+ return nil
+ }
+ return item
+ }, nil)
+}
+
+func (r *ProcessBasedChecker[V]) Fetch() error {
+ if len(r.PidWithInfos) == 0 {
+ return nil
+ }
+
+ var result error
+ now := time.Now()
+ for _, info := range r.PidWithInfos {
+ val, err := r.DataGenerate(info.Process)
+ if err != nil {
+ result = multierror.Append(result, fmt.Errorf("get the process %s failure, pid: %d, error: %v",
+ r.CheckType, info.Process.Pid(), err))
+ continue
+ }
+
+ info.Windows.Add(now, val)
+ }
+ return result
+}
+
+func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause {
+ if len(r.PidWithInfos) == 0 {
+ return nil
+ }
+
+ causes := make([]base.ThresholdCause, 0)
+ for _, info := range r.PidWithInfos {
+ for _, threshold := range info.Policies {
+ if data, hasData := info.Windows.FlushLastWriteData(); hasData {
+ metricsAppender.AppendProcessSingleValue(strings.ToLower(string(r.CheckType)), info.Process, nil, float64(data))
+ }
+ if !ctx.ShouldCheck(info.Process, threshold.Policy) {
+ continue
+ }
+ // check is reach the threshold
+ if lastMatch, enable := info.Windows.MatchRule(threshold.Policy, func(val V) bool {
+ return val >= threshold.Threshold
+ }); enable {
+ causes = append(causes,
+ NewSingleValueCause(info.Process, threshold.Policy, r.CauseType, float64(threshold.Threshold), float64(lastMatch)))
+ }
+ }
+ }
+ return causes
+}
+
+func (r *ProcessBasedChecker[V]) Close() error {
+ return nil
+}
+
+type ProcessBasedInfo[V numbers] struct {
+ Process api.ProcessInterface
+ Windows *base.TimeWindows[V, V]
+ Policies []*ProcessBasedPolicy[V]
+}
+
+type ProcessBasedPolicy[V numbers] struct {
+ Threshold V
+ Policy *base.PolicyItem
+}
diff --git a/pkg/profiling/continuous/checker/common/system_checker.go b/pkg/profiling/continuous/checker/common/system_checker.go
new file mode 100644
index 0000000..5cc4e60
--- /dev/null
+++ b/pkg/profiling/continuous/checker/common/system_checker.go
@@ -0,0 +1,138 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package common
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type SystemBasedChecker[V numbers] struct {
+ CheckType base.CheckType
+ CauseType v3.ContinuousProfilingCauseType
+ ThresholdGenerate func(val string) (V, error)
+ DataGenerate func() (V, error)
+ GlobalWindows *base.TimeWindows[V, V]
+
+ Policies []*SystemBasedPolicy[V]
+}
+
+func NewSystemBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
+ dataGenerator func() (V, error), causeType v3.ContinuousProfilingCauseType) *SystemBasedChecker[V] {
+ checker := &SystemBasedChecker[V]{
+ CheckType: checkType,
+ CauseType: causeType,
+ ThresholdGenerate: thresholdGenerator,
+ DataGenerate: dataGenerator,
+ GlobalWindows: base.NewTimeWindows[V, V](nil, func() base.WindowData[V, V] {
+ return base.NewLatestWindowData[V]()
+ }),
+ }
+ return checker
+}
+
+func (s *SystemBasedChecker[V]) SyncPolicies(policies []*base.SyncPolicyWithProcesses) {
+ result := make([]*SystemBasedPolicy[V], 0)
+ items := make([]*base.PolicyItem, 0)
+ for _, policyWithProcesses := range policies {
+ item := policyWithProcesses.Policy.Items[s.CheckType]
+ if item == nil {
+ continue
+ }
+ threshold, err := s.ThresholdGenerate(item.Threshold)
+ if err != nil {
+ log.Warnf("failure to parse the %s threshold: %v, error: %v", s.CheckType, item.Threshold, err)
+ continue
+ }
+ items = append(items, item)
+ processes := make([]api.ProcessInterface, 0)
+ for _, p := range policyWithProcesses.Processes {
+ processes = append(processes, p)
+ }
+ result = append(result, &SystemBasedPolicy[V]{
+ Threshold: threshold,
+ Policy: item,
+ Processes: processes,
+ })
+ }
+
+ s.Policies = result
+ s.GlobalWindows.ScalePeriod(items)
+}
+
+func (s *SystemBasedChecker[V]) Fetch() error {
+ if len(s.Policies) == 0 {
+ return nil
+ }
+
+ val, err := s.DataGenerate()
+ if err != nil {
+ return fmt.Errorf("get the system %s error: %v", s.CheckType, err)
+ }
+ s.GlobalWindows.Add(time.Now(), val)
+ return nil
+}
+
+func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *base.MetricsAppender) []base.ThresholdCause {
+ if len(s.Policies) == 0 {
+ return nil
+ }
+
+ causes := make([]base.ThresholdCause, 0)
+ data, hasData := s.GlobalWindows.FlushLastWriteData()
+
+ for _, policy := range s.Policies {
+ if hasData {
+ for _, p := range policy.Processes {
+ metricsAppender.AppendProcessSingleValue(strings.ToLower(string(s.CheckType)), p, nil, float64(data))
+ }
+ }
+ lastMatch, isMatch := s.GlobalWindows.MatchRule(policy.Policy, func(val V) bool {
+ return val >= policy.Threshold
+ })
+ if !isMatch {
+ continue
+ }
+
+ for _, p := range policy.Processes {
+ if !ctx.ShouldCheck(p, policy.Policy) {
+ continue
+ }
+
+ causes = append(causes, NewSingleValueCause(p, policy.Policy, s.CauseType, float64(policy.Threshold), float64(lastMatch)))
+ }
+ }
+
+ return causes
+}
+
+func (s *SystemBasedChecker[V]) Close() error {
+ return nil
+}
+
+type SystemBasedPolicy[V numbers] struct {
+ Threshold V
+ Policy *base.PolicyItem
+ Processes []api.ProcessInterface
+}
diff --git a/pkg/profiling/continuous/checker/network_error_rate.go b/pkg/profiling/continuous/checker/network_error_rate.go
new file mode 100644
index 0000000..7ba08c9
--- /dev/null
+++ b/pkg/profiling/continuous/checker/network_error_rate.go
@@ -0,0 +1,73 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package checker
+
+import (
+ "strconv"
+
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/bpf/network"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type NetworkHTTPErrorRateChecker struct {
+ *common.HTTPBasedChecker[*processNetworkResponseErrorStatics]
+}
+
+func NewNetworkResponseErrorChecker() *NetworkHTTPErrorRateChecker {
+ return &NetworkHTTPErrorRateChecker{}
+}
+
+func (n *NetworkHTTPErrorRateChecker) Init(config *base.ContinuousConfig) error {
+ n.HTTPBasedChecker = common.NewHTTPBasedChecker[*processNetworkResponseErrorStatics](
+ base.CheckTypeHTTPErrorRate, func(val string) (float64, error) {
+ v, err := strconv.ParseFloat(val, 64)
+ if err != nil {
+ return 0, err
+ }
+ return v, nil
+ }, func() base.WindowData[network.BufferEvent, float64] {
+ return &processNetworkResponseErrorStatics{}
+ }, v3.ContinuousProfilingCauseType_HTTPErrorRate)
+ return nil
+}
+
+type processNetworkResponseErrorStatics struct {
+ totalCount int
+ errorCount int
+}
+
+func (s *processNetworkResponseErrorStatics) Reset() {
+ s.totalCount, s.errorCount = 0, 0
+}
+
+func (s *processNetworkResponseErrorStatics) Accept(data network.BufferEvent) {
+ s.totalCount++
+ if data.IsResponseError() {
+ s.errorCount++
+ }
+}
+
+func (s *processNetworkResponseErrorStatics) Get() float64 {
+ if s.totalCount == 0 {
+ return 0
+ }
+ return float64(s.errorCount) / float64(s.totalCount) * 100
+}
diff --git a/pkg/profiling/continuous/checker/network_response_time.go b/pkg/profiling/continuous/checker/network_response_time.go
new file mode 100644
index 0000000..7dd1754
--- /dev/null
+++ b/pkg/profiling/continuous/checker/network_response_time.go
@@ -0,0 +1,64 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package checker
+
+import (
+ "strconv"
+
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/bpf/network"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type NetworkHTTPAvgResponseTimeChecker struct {
+ *common.HTTPBasedChecker[*processNetworkAvgResponseTimeStatics]
+}
+
+func NewNetworkAvgResponseTimeChecker() *NetworkHTTPAvgResponseTimeChecker {
+ return &NetworkHTTPAvgResponseTimeChecker{}
+}
+
+func (n *NetworkHTTPAvgResponseTimeChecker) Init(config *base.ContinuousConfig) error {
+ n.HTTPBasedChecker = common.NewHTTPBasedChecker[*processNetworkAvgResponseTimeStatics](
+ base.CheckTypeHTTPAvgResponseTime, func(val string) (float64, error) {
+ return strconv.ParseFloat(val, 64)
+ }, func() base.WindowData[network.BufferEvent, float64] {
+ return &processNetworkAvgResponseTimeStatics{}
+ }, v3.ContinuousProfilingCauseType_HTTPAvgResponseTime)
+ return nil
+}
+
+type processNetworkAvgResponseTimeStatics struct {
+ totalCount int
+ totalDuration int
+}
+
+func (s *processNetworkAvgResponseTimeStatics) Reset() {
+ s.totalCount, s.totalDuration = 0, 0
+}
+
+func (s *processNetworkAvgResponseTimeStatics) Accept(data network.BufferEvent) {
+ s.totalCount++
+ s.totalDuration += int(data.Duration().Milliseconds())
+}
+
+func (s *processNetworkAvgResponseTimeStatics) Get() float64 {
+ return float64(s.totalDuration) / float64(s.totalCount)
+}
diff --git a/pkg/profiling/continuous/checker/process_cpu.go b/pkg/profiling/continuous/checker/process_cpu.go
new file mode 100644
index 0000000..f0f226e
--- /dev/null
+++ b/pkg/profiling/continuous/checker/process_cpu.go
@@ -0,0 +1,53 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package checker
+
+import (
+ "strconv"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type ProcessCPUChecker struct {
+ *common.ProcessBasedChecker[float64]
+}
+
+func NewProcessCPUChecker() *ProcessCPUChecker {
+ return &ProcessCPUChecker{}
+}
+
+func (r *ProcessCPUChecker) Init(config *base.ContinuousConfig) error {
+ r.ProcessBasedChecker = common.NewProcessBasedChecker(base.CheckTypeProcessCPU, func(val string) (float64, error) {
+ threshold, err := strconv.ParseInt(val, 10, 32)
+ if err != nil {
+ return 0, err
+ }
+ return float64(threshold), nil
+ }, func(p api.ProcessInterface) (float64, error) {
+ percent, err := p.OriginalProcess().CPUPercent()
+ if err != nil {
+ return 0, err
+ }
+ return percent * 100, nil
+ }, v3.ContinuousProfilingCauseType_ProcessCPU)
+ return nil
+}
diff --git a/pkg/profiling/continuous/checker/process_thread.go b/pkg/profiling/continuous/checker/process_thread.go
new file mode 100644
index 0000000..566abe2
--- /dev/null
+++ b/pkg/profiling/continuous/checker/process_thread.go
@@ -0,0 +1,47 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package checker
+
+import (
+ "strconv"
+
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type ProcessThreadCountChecker struct {
+ *common.ProcessBasedChecker[int32]
+}
+
+func NewProcessThreadCountChecker() *ProcessThreadCountChecker {
+ return &ProcessThreadCountChecker{}
+}
+
+func (t *ProcessThreadCountChecker) Init(config *base.ContinuousConfig) error {
+ t.ProcessBasedChecker = common.NewProcessBasedChecker(base.CheckTypeProcessThreadCount, func(val string) (int32, error) {
+ count, err := strconv.ParseInt(val, 10, 32)
+ return int32(count), err
+ }, func(p api.ProcessInterface) (int32, error) {
+ threads, err := p.OriginalProcess().NumThreads()
+ return threads, err
+ }, v3.ContinuousProfilingCauseType_ProcessThreadCount)
+ return nil
+}
diff --git a/pkg/profiling/continuous/checker/system_load.go b/pkg/profiling/continuous/checker/system_load.go
new file mode 100644
index 0000000..f77b949
--- /dev/null
+++ b/pkg/profiling/continuous/checker/system_load.go
@@ -0,0 +1,51 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package checker
+
+import (
+ "strconv"
+
+ "github.com/shirou/gopsutil/load"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+)
+
+type SystemLoadChecker struct {
+ *common.SystemBasedChecker[float64]
+}
+
+func NewSystemLoadChecker() *SystemLoadChecker {
+ return &SystemLoadChecker{}
+}
+
+func (s *SystemLoadChecker) Init(config *base.ContinuousConfig) error {
+ s.SystemBasedChecker = common.NewSystemBasedChecker[float64](
+ base.CheckTypeSystemLoad, func(val string) (float64, error) {
+ return strconv.ParseFloat(val, 64)
+ }, func() (float64, error) {
+ avg, err := load.Avg()
+ if err != nil {
+ return 0, err
+ }
+ return avg.Load1, nil
+ }, v3.ContinuousProfilingCauseType_SystemLoad)
+ return nil
+}
diff --git a/pkg/profiling/continuous/checkers.go b/pkg/profiling/continuous/checkers.go
new file mode 100644
index 0000000..1e4cc05
--- /dev/null
+++ b/pkg/profiling/continuous/checkers.go
@@ -0,0 +1,369 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package continuous
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/core"
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker"
+
+ profilingv3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+ meterv3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
+
+ "github.com/hashicorp/go-multierror"
+)
+
+var checkerRegistration = make([]base.Checker, 0)
+
+func init() {
+ checkerRegistration = append(checkerRegistration,
+ // system
+ checker.NewSystemLoadChecker(),
+ // process
+ checker.NewProcessCPUChecker(),
+ checker.NewProcessThreadCountChecker(),
+ // network
+ checker.NewNetworkResponseErrorChecker(),
+ checker.NewNetworkAvgResponseTimeChecker())
+}
+
+type Checkers struct {
+ meterPrefix string
+ fetchDuration time.Duration
+ checkDuration time.Duration
+ processOperator process.Operator
+ triggers *Triggers
+ policiesCache map[string]*base.ServicePolicy
+
+ meterClient meterv3.MeterReportServiceClient
+ continuousClient profilingv3.ContinuousProfilingServiceClient
+ ctx context.Context
+}
+
+func NewCheckers(ctx context.Context, moduleMgr *module.Manager, conf *base.ContinuousConfig, triggers *Triggers) (*Checkers, error) {
+ connection := moduleMgr.FindModule(core.ModuleName).(core.Operator).BackendOperator().GetConnection()
+ meterClient := meterv3.NewMeterReportServiceClient(connection)
+ continuousClient := profilingv3.NewContinuousProfilingServiceClient(connection)
+
+ if conf.MeterPrefix == "" {
+ return nil, fmt.Errorf("the continuous profiling meter prefix cannot be empty")
+ }
+
+ fetchDuration, err := time.ParseDuration(conf.FetchInterval)
+ if err != nil {
+ return nil, fmt.Errorf("fetch duration error: %v", err)
+ }
+ checkDuration, err := time.ParseDuration(conf.CheckInterval)
+ if err != nil {
+ return nil, fmt.Errorf("check duration error: %v", err)
+ }
+
+ for _, checker := range checkerRegistration {
+ if e := checker.Init(conf); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return &Checkers{
+ meterClient: meterClient,
+ continuousClient: continuousClient,
+ meterPrefix: conf.MeterPrefix,
+ fetchDuration: fetchDuration,
+ checkDuration: checkDuration,
+ processOperator: moduleMgr.FindModule(process.ModuleName).(process.Operator),
+ triggers: triggers,
+ policiesCache: make(map[string]*base.ServicePolicy),
+ ctx: ctx,
+ }, nil
+}
+
+func (c *Checkers) Start() {
+ // starting to check the threshold with interval
+ go func() {
+ fetchTicker := time.NewTicker(c.fetchDuration)
+ checkTicker := time.NewTicker(c.checkDuration)
+ for {
+ select {
+ case <-fetchTicker.C:
+ if err := c.fetchAllData(); err != nil {
+ log.Errorf("fetch all data error: %v", err)
+ }
+ case <-checkTicker.C:
+ c.checkAllThresholds()
+ case <-c.ctx.Done():
+ checkTicker.Stop()
+ return
+ }
+ }
+ }()
+}
+
+func (c *Checkers) Stop() error {
+ var err error
+ for _, checker := range checkerRegistration {
+ if e := checker.Close(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ return err
+}
+
+func (c *Checkers) CheckProfilingPolicies() error {
+ // fetch and update the policies
+ if hasUpdate, err := c.updatePolicyCache(); err != nil {
+ return err
+ } else if !hasUpdate {
+ return nil
+ }
+
+ // synchronized to all checkers
+ policiesWithProcesses := make([]*base.SyncPolicyWithProcesses, 0)
+ for _, servicePolicy := range c.policiesCache {
+ for _, policy := range servicePolicy.Policies {
+ policiesWithProcesses = append(policiesWithProcesses, &base.SyncPolicyWithProcesses{
+ Policy: policy,
+ Processes: servicePolicy.Processes,
+ })
+ }
+ }
+ for _, checker := range checkerRegistration {
+ checker.SyncPolicies(policiesWithProcesses)
+ }
+ return nil
+}
+
+func (c *Checkers) fetchAllData() error {
+ var err error
+ for _, checker := range checkerRegistration {
+ if e := checker.Fetch(); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ return err
+}
+
+func (c *Checkers) checkAllThresholds() {
+ // check all thresholds and send metrics
+ metricsAppender := base.NewMetricsAppender(c.meterPrefix)
+ causes := c.findAllMatchCauses(metricsAppender)
+ if e := metricsAppender.Flush(c.ctx, c.meterClient); e != nil {
+ log.Warnf("flush the checker metrics failure: %v", e)
+ }
+ if len(causes) == 0 {
+ return
+ }
+
+ c.triggers.handleCauses(causes)
+}
+
+func (c *Checkers) findAllMatchCauses(appender *base.MetricsAppender) []base.ThresholdCause {
+ causes := make([]base.ThresholdCause, 0)
+ for _, checker := range checkerRegistration {
+ overThresholds := checker.Check(c, appender)
+ if len(overThresholds) == 0 {
+ continue
+ }
+
+ causes = append(causes, overThresholds...)
+ }
+
+ return causes
+}
+
+func (c *Checkers) ShouldCheck(p api.ProcessInterface, item *base.PolicyItem) bool {
+ profilingType := item.Policy.TargetProfilingType
+ trigger := triggerRegistration[profilingType]
+ return trigger.ShouldTrigger(p)
+}
+
+func (c *Checkers) updatePolicyCache() (bool, error) {
+ processes := c.processOperator.FindAllRegisteredProcesses()
+ if len(processes) == 0 {
+ // if existing policies, then clean it
+ if (len(c.policiesCache)) > 0 {
+ c.policiesCache = make(map[string]*base.ServicePolicy)
+ return true, nil
+ }
+ return false, nil
+ }
+
+ serviceProcesses := make(map[string]map[string]api.ProcessInterface)
+
+ // get all existing service and policy UUID mapping
+ servicePolicyUUIDCache := make(map[string]string, 0)
+ for _, p := range processes {
+ serviceName := p.Entity().ServiceName
+ cachedPolicy := c.policiesCache[serviceName]
+ if cachedPolicy != nil {
+ servicePolicyUUIDCache[serviceName] = cachedPolicy.UUID
+ } else {
+ servicePolicyUUIDCache[serviceName] = ""
+ }
+
+ // build the service process
+ serviceProcessesMap := serviceProcesses[serviceName]
+ if serviceProcessesMap == nil {
+ serviceProcessesMap = make(map[string]api.ProcessInterface)
+ serviceProcesses[serviceName] = serviceProcessesMap
+ }
+ serviceProcessesMap[p.ID()] = p
+ }
+
+ policiesUpdates, err := c.queryPolicyUpdates(servicePolicyUUIDCache)
+ if err != nil {
+ return false, err
+ }
+ hasUpdate := false
+ for serviceName, policy := range policiesUpdates {
+ existingPolicy := c.policiesCache[serviceName]
+ // update cache if the service policy not exist or UUID are not same
+ if existingPolicy == nil || existingPolicy.UUID != policy.UUID {
+ existingPolicy = policy
+ c.policiesCache[serviceName] = policy
+ hasUpdate = true
+ }
+ // update the processes if they are not same
+ if !c.checkProcessesAreSame(existingPolicy.Processes, serviceProcesses[serviceName]) {
+ hasUpdate = true
+ existingPolicy.Processes = serviceProcesses[serviceName]
+ }
+ }
+ return hasUpdate, nil
+}
+
+func (c *Checkers) checkProcessesAreSame(from, target map[string]api.ProcessInterface) bool {
+ if len(from) != len(target) {
+ return false
+ }
+
+ // all process id have same pid
+ for processID, targetProcess := range target {
+ if fromProcess := from[processID]; fromProcess == nil {
+ return false
+ } else if fromProcess.Pid() != targetProcess.Pid() {
+ return false
+ }
+ }
+
+ return true
+}
+
+func (c *Checkers) queryPolicyUpdates(servicePolicies map[string]string) (map[string]*base.ServicePolicy, error) {
+ queries := make([]*profilingv3.ContinuousProfilingServicePolicyQuery, 0)
+ for k, v := range servicePolicies {
+ queries = append(queries, &profilingv3.ContinuousProfilingServicePolicyQuery{
+ ServiceName: k,
+ Uuid: v,
+ })
+ }
+ policyUpdateCommands, err := c.continuousClient.QueryPolicies(c.ctx, &profilingv3.ContinuousProfilingPolicyQuery{Policies: queries})
+ if err != nil {
+ return nil, err
+ }
+
+ if len(policyUpdateCommands.GetCommands()) != 1 ||
+ policyUpdateCommands.GetCommands()[0].GetCommand() != "ContinuousProfilingPolicyQuery" ||
+ len(policyUpdateCommands.GetCommands()[0].GetArgs()) != 1 ||
+ policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetKey() != "ServiceWithPolicyJSON" {
+ return nil, fmt.Errorf("the query policy response not adapt")
+ }
+
+ policyJSON := policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetValue()
+ updates := make([]*QueryPolicyUpdate, 0)
+ err = json.Unmarshal([]byte(policyJSON), &updates)
+ if err != nil {
+ return nil, fmt.Errorf("error to unmarshal the policy updates: %v", err)
+ }
+
+ result := make(map[string]*base.ServicePolicy)
+ for _, update := range updates {
+ servicePolicy := &base.ServicePolicy{
+ Service: update.ServiceName,
+ UUID: update.UUID,
+ }
+ for profilingType, checks := range update.Profiling {
+ policy := &base.Policy{
+ TargetProfilingType: profilingType,
+ Items: make(map[base.CheckType]*base.PolicyItem),
+ ServicePolicy: servicePolicy,
+ }
+
+ for checkType, item := range checks {
+ if err := item.Validate(); err != nil {
+ log.Warnf("cannot add the policy item, service name: %s, profiling type: %s, policy type: %s, error: %v",
+ update.ServiceName, profilingType, checkType, err)
+ continue
+ }
+ policy.Items[checkType] = &base.PolicyItem{
+ Threshold: item.Threshold,
+ Period: item.Period,
+ Count: item.Count,
+ URIList: item.URIList,
+ URIRegex: item.URIRegex,
+ Policy: policy,
+ }
+ }
+
+ servicePolicy.Policies = append(servicePolicy.Policies, policy)
+ }
+
+ result[update.ServiceName] = servicePolicy
+ }
+ return result, nil
+}
+
+type QueryPolicyUpdate struct {
+ ServiceName string `json:"ServiceName"`
+ UUID string `json:"UUID"`
+ Profiling map[base.TargetProfilingType]map[base.CheckType]*QueryPolicyUpdateItem
+}
+
+type QueryPolicyUpdateItem struct {
+ Threshold string `json:"Threshold"`
+ Period int `json:"Period"`
+ Count int `json:"Count"`
+ URIList []string `json:"URIList"`
+ URIRegex string `json:"URIRegex"`
+}
+
+func (p *QueryPolicyUpdateItem) Validate() error {
+ if p.Threshold == "" {
+ return fmt.Errorf("thrshold cannot be empty")
+ }
+ if p.Period <= 0 {
+ return fmt.Errorf("period cannot smaller or equals zero")
+ }
+ if p.Count <= 0 {
+ return fmt.Errorf("count cannot smaller or equals zero")
+ }
+ if p.Count > p.Period {
+ return fmt.Errorf("count cannot be bigger than period")
+ }
+ return nil
+}
diff --git a/pkg/profiling/continuous/manager.go b/pkg/profiling/continuous/manager.go
new file mode 100644
index 0000000..7ce6939
--- /dev/null
+++ b/pkg/profiling/continuous/manager.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package continuous
+
+import (
+ "context"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task"
+)
+
+var log = logger.GetLogger("profiling", "continuous")
+
+type Manager struct {
+ checkers *Checkers
+ triggers *Triggers
+
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+func NewManager(ctx context.Context, taskManager *task.Manager, moduleMgr *module.Manager, config *base.ContinuousConfig) (*Manager, error) {
+ m := &Manager{}
+ m.ctx, m.cancel = context.WithCancel(ctx)
+
+ // init all checkers and triggerRegistration
+ triggers, err := NewTriggers(m.ctx, moduleMgr, taskManager, config)
+ if err != nil {
+ return nil, err
+ }
+ m.triggers = triggers
+ checkers, err := NewCheckers(m.ctx, moduleMgr, config, m.triggers)
+ if err != nil {
+ return nil, err
+ }
+ m.checkers = checkers
+
+ return m, nil
+}
+
+func (m *Manager) Start() {
+ m.checkers.Start()
+}
+
+func (m *Manager) CheckPolicies() error {
+ return m.checkers.CheckProfilingPolicies()
+}
+
+func (m *Manager) Shutdown() error {
+ err := m.checkers.Stop()
+ m.cancel()
+ return err
+}
diff --git a/pkg/profiling/continuous/trigger/common.go b/pkg/profiling/continuous/trigger/common.go
new file mode 100644
index 0000000..f88cfed
--- /dev/null
+++ b/pkg/profiling/continuous/trigger/common.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trigger
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task"
+ taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+var log = logger.GetLogger("profiling", "continuous", "trigger")
+
+type BaseTrigger struct {
+ profilingCache map[string]*task.Context
+ executeTime time.Duration
+ silenceTime time.Duration
+
+ profilingTaskDimension func(p api.ProcessInterface) string
+ mainProcessSelector func(ps []api.ProcessInterface) api.ProcessInterface
+ taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause)
+ reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)
+}
+
+func NewSingleProcessBaseTrigger(conf *base.ContinuousConfig,
+ taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause),
+ reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)) *BaseTrigger {
+ trigger := &BaseTrigger{
+ profilingTaskDimension: func(p api.ProcessInterface) string {
+ return p.ID()
+ },
+ mainProcessSelector: func(ps []api.ProcessInterface) api.ProcessInterface {
+ return ps[0]
+ },
+ taskSetter: taskSetter,
+ reportSetter: reportSetter,
+ profilingCache: make(map[string]*task.Context),
+ }
+ return trigger
+}
+
+func NewMultipleProcessBasedTrigger(conf *base.ContinuousConfig,
+ profilingTaskDimension func(p api.ProcessInterface) string,
+ mainProcessSelector func(ps []api.ProcessInterface) api.ProcessInterface,
+ taskSetter func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause),
+ reportSetter func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause)) *BaseTrigger {
+ trigger := &BaseTrigger{
+ profilingTaskDimension: profilingTaskDimension,
+ mainProcessSelector: mainProcessSelector,
+ taskSetter: taskSetter,
+ reportSetter: reportSetter,
+ profilingCache: make(map[string]*task.Context),
+ }
+ return trigger
+}
+
+func (c *BaseTrigger) Init(conf *base.ContinuousConfig) error {
+ executeDuration, err := time.ParseDuration(conf.Trigger.ExecuteDuration)
+ if err != nil {
+ return fmt.Errorf("check trigger task execute duration error: %v", err)
+ }
+
+ silenceDuration, err := time.ParseDuration(conf.Trigger.SilenceDuration)
+ if err != nil {
+ return fmt.Errorf("check trigger task silence duration error: %v", err)
+ }
+
+ c.executeTime = executeDuration
+ c.silenceTime = silenceDuration
+ return nil
+}
+
+func (c *BaseTrigger) ShouldTrigger(p api.ProcessInterface) bool {
+ return c.shouldTriggerFromDimension(c.profilingTaskDimension(p))
+}
+
+func (c *BaseTrigger) TriggerTasks(reporter base.TriggerReporter, causes []base.ThresholdCause) int {
+ executeCount := 0
+
+ // build needs profiling processes data cache
+ // key: dimension, value: map[process][]thresholds
+ dimensionedProcessThresholds := make(map[string]map[api.ProcessInterface][]base.ThresholdCause)
+ for _, cause := range causes {
+ causeProcess := cause.Process()
+ dimension := c.profilingTaskDimension(causeProcess)
+ if !c.shouldTriggerFromDimension(dimension) {
+ continue
+ }
+
+ processThresholds := dimensionedProcessThresholds[dimension]
+ if processThresholds == nil {
+ processThresholds = make(map[api.ProcessInterface][]base.ThresholdCause)
+ dimensionedProcessThresholds[dimension] = processThresholds
+ }
+
+ processThresholds[causeProcess] = append(processThresholds[causeProcess], cause)
+ }
+
+ // reports task through cache
+ for dimension, processWithThresholds := range dimensionedProcessThresholds {
+ processes := make([]api.ProcessInterface, 0)
+ var mainProcess api.ProcessInterface
+ for process := range processWithThresholds {
+ processes = append(processes, process)
+ }
+ if len(processes) == 1 {
+ mainProcess = processes[0]
+ } else {
+ mainProcess = c.mainProcessSelector(processes)
+ }
+
+ thresholdCauses := processWithThresholds[mainProcess]
+ taskContext, err := reporter.ReportProcesses(mainProcess, processes, thresholdCauses,
+ func(task *taskBase.ProfilingTask) {
+ task.MaxRunningDuration = c.executeTime
+ c.taskSetter(task, processes, thresholdCauses)
+ }, func(report *v3.ContinuousProfilingReport) {
+ report.Duration = int32(c.executeTime.Seconds())
+ c.reportSetter(report, processes, thresholdCauses)
+ })
+ if err != nil {
+ log.Warnf("failure to report the cause, process id: %s, error: %v", mainProcess.ID(), err)
+ continue
+ }
+
+ c.profilingCache[dimension] = taskContext
+ executeCount++
+ }
+ return executeCount
+}
+
+func (c *BaseTrigger) shouldTriggerFromDimension(dimension string) bool {
+ t := c.profilingCache[dimension]
+ if t == nil {
+ return true
+ } else if t.IsRunning() {
+ return false
+ }
+ return !t.RunningTime().IsZero() && time.Since(t.RunningTime()) > c.silenceTime
+}
diff --git a/pkg/profiling/continuous/trigger/network.go b/pkg/profiling/continuous/trigger/network.go
new file mode 100644
index 0000000..dde5add
--- /dev/null
+++ b/pkg/profiling/continuous/trigger/network.go
@@ -0,0 +1,120 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trigger
+
+import (
+ "fmt"
+
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/checker/common"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type NetworkTrigger struct {
+ *BaseTrigger
+}
+
+func NewNetworkTrigger() base.Trigger {
+ return &NetworkTrigger{}
+}
+
+func (n *NetworkTrigger) Init(moduleMgr *module.Manager, conf *base.ContinuousConfig) error {
+ n.BaseTrigger = NewMultipleProcessBasedTrigger(conf, func(p api.ProcessInterface) string {
+ // same instance
+ entity := p.Entity()
+ return fmt.Sprintf("%s_%s", entity.ServiceName, entity.InstanceName)
+ }, func(ps []api.ProcessInterface) api.ProcessInterface {
+ var mainApplication api.ProcessInterface
+ for _, p := range ps {
+ // for service mesh, find the application
+ if processHasLabel(p, "mesh-application") {
+ mainApplication = p
+ }
+ // otherwise, find the process belong a kubernetes service
+ if mainApplication == nil && processHasLabel(p, "k8s-service") {
+ mainApplication = p
+ }
+ }
+ if mainApplication != nil {
+ return mainApplication
+ }
+ return ps[0]
+ }, func(task *profiling.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ task.TargetType = profiling.TargetTypeNetworkTopology
+ task.ExtensionConfig = &profiling.ExtensionConfig{
+ NetworkSamplings: transformCausesToNetworkSamplingRules(thresholds),
+ }
+ }, func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ rules := transformCausesToNetworkSamplingRules(thresholds)
+ uriRegexes := make([]string, 0)
+ if len(rules) > 0 {
+ for _, r := range rules {
+ uriRegexes = append(uriRegexes, *r.URIRegex)
+ }
+ }
+ report.TargetTask = &v3.ContinuousProfilingReport_Network{
+ Network: &v3.ContinuousNetworkProfilingTask{
+ SamplingURIRegexes: uriRegexes,
+ },
+ }
+ })
+ return n.BaseTrigger.Init(conf)
+}
+
+func processHasLabel(p api.ProcessInterface, label string) bool {
+ for _, l := range p.Entity().Labels {
+ if l == label {
+ return true
+ }
+ }
+ return false
+}
+
+func transformCausesToNetworkSamplingRules(thresholds []base.ThresholdCause) []*profiling.NetworkSamplingRule {
+ result := make([]*profiling.NetworkSamplingRule, 0)
+ var minDuration int32 = 0
+ for _, threshold := range thresholds {
+ uriCause, ok := threshold.(*common.URICause)
+ if !ok {
+ continue
+ }
+ // collecting all request and response
+ rule := &profiling.NetworkSamplingRule{
+ URIRegex: &uriCause.URI,
+ MinDuration: &minDuration,
+ When4XX: true,
+ When5XX: true,
+ Settings: &profiling.NetworkDataCollectingSettings{
+ RequireCompleteRequest: true,
+ MaxRequestSize: -1,
+ RequireCompleteResponse: true,
+ MaxResponseSize: -1,
+ },
+ }
+
+ if uriCause.URI == "" {
+ return []*profiling.NetworkSamplingRule{rule}
+ }
+ result = append(result, rule)
+ }
+ return result
+}
diff --git a/pkg/profiling/continuous/trigger/offcpu.go b/pkg/profiling/continuous/trigger/offcpu.go
new file mode 100644
index 0000000..9d16a08
--- /dev/null
+++ b/pkg/profiling/continuous/trigger/offcpu.go
@@ -0,0 +1,47 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trigger
+
+import (
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type OffCPUTrigger struct {
+ *BaseTrigger
+}
+
+func NewOffCPUTrigger() base.Trigger {
+ return &OffCPUTrigger{}
+}
+
+func (o *OffCPUTrigger) Init(moduleMgr *module.Manager, conf *base.ContinuousConfig) error {
+ o.BaseTrigger = NewSingleProcessBaseTrigger(conf,
+ func(task *profiling.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ task.TargetType = profiling.TargetTypeOffCPU
+ }, func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ report.TargetTask = &v3.ContinuousProfilingReport_OffCPU{
+ OffCPU: &v3.ContinuousOffCPUProfilingTask{},
+ }
+ })
+ return o.BaseTrigger.Init(conf)
+}
diff --git a/pkg/profiling/continuous/trigger/oncpu.go b/pkg/profiling/continuous/trigger/oncpu.go
new file mode 100644
index 0000000..2929149
--- /dev/null
+++ b/pkg/profiling/continuous/trigger/oncpu.go
@@ -0,0 +1,47 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trigger
+
+import (
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+)
+
+type OnCPUTrigger struct {
+ *BaseTrigger
+}
+
+func NewOnCPUTrigger() base.Trigger {
+ return &OnCPUTrigger{}
+}
+
+func (c *OnCPUTrigger) Init(moduleMgr *module.Manager, conf *base.ContinuousConfig) error {
+ c.BaseTrigger = NewSingleProcessBaseTrigger(conf,
+ func(task *taskBase.ProfilingTask, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ task.TargetType = taskBase.TargetTypeOnCPU
+ }, func(report *v3.ContinuousProfilingReport, processes []api.ProcessInterface, thresholds []base.ThresholdCause) {
+ report.TargetTask = &v3.ContinuousProfilingReport_OnCPU{
+ OnCPU: &v3.ContinuousOnCPUProfilingTask{},
+ }
+ })
+ return c.BaseTrigger.Init(conf)
+}
diff --git a/pkg/profiling/continuous/triggers.go b/pkg/profiling/continuous/triggers.go
new file mode 100644
index 0000000..e2df2e2
--- /dev/null
+++ b/pkg/profiling/continuous/triggers.go
@@ -0,0 +1,128 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package continuous
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/skywalking-rover/pkg/core"
+ "github.com/apache/skywalking-rover/pkg/module"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous/trigger"
+ "github.com/apache/skywalking-rover/pkg/profiling/task"
+ taskBase "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+
+ v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
+
+ "github.com/hashicorp/go-multierror"
+)
+
+var triggerRegistration map[base.TargetProfilingType]base.Trigger
+
+func init() {
+ triggerRegistration = make(map[base.TargetProfilingType]base.Trigger)
+ triggerRegistration[base.TargetProfilingTypeOnCPU] = trigger.NewOnCPUTrigger()
+ triggerRegistration[base.TargetProfilingTypeOffCPU] = trigger.NewOffCPUTrigger()
+ triggerRegistration[base.TargetProfilingTypeNetwork] = trigger.NewNetworkTrigger()
+}
+
+type Triggers struct {
+ taskManager *task.Manager
+ continuousClient v3.ContinuousProfilingServiceClient
+
+ ctx context.Context
+}
+
+func NewTriggers(ctx context.Context, moduleMgr *module.Manager, taskManager *task.Manager, conf *base.ContinuousConfig) (*Triggers, error) {
+ coreOperator := moduleMgr.FindModule(core.ModuleName).(core.Operator)
+ continuousClient := v3.NewContinuousProfilingServiceClient(coreOperator.BackendOperator().GetConnection())
+
+ var err error
+ for _, t := range triggerRegistration {
+ if e := t.Init(moduleMgr, conf); e != nil {
+ err = multierror.Append(err, e)
+ }
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &Triggers{
+ taskManager: taskManager,
+ continuousClient: continuousClient,
+ ctx: ctx,
+ }, nil
+}
+
+func (m *Triggers) handleCauses(causes []base.ThresholdCause) {
+ // generate the profiling tasks from the triggerRegistration
+ profilingTypeWithCauses := make(map[base.TargetProfilingType][]base.ThresholdCause)
+ for _, cause := range causes {
+ profilingType := cause.FromPolicy().Policy.TargetProfilingType
+ profilingTypeWithCauses[profilingType] = append(profilingTypeWithCauses[profilingType], cause)
+ }
+ for profilingType, ps := range profilingTypeWithCauses {
+ if taskCount := triggerRegistration[profilingType].TriggerTasks(m, ps); taskCount > 0 {
+ log.Infof("total generate %d %s tasks", taskCount, profilingType)
+ }
+ }
+}
+
+func (m *Triggers) ReportProcesses(process api.ProcessInterface, profilingProcesses []api.ProcessInterface, cases []base.ThresholdCause,
+ taskSetter func(task *taskBase.ProfilingTask), reportSetter func(report *v3.ContinuousProfilingReport)) (*task.Context, error) {
+ transferCauses := make([]*v3.ContinuousProfilingCause, 0)
+ for _, c := range cases {
+ transferCauses = append(transferCauses, c.GenerateTransferCause())
+ }
+
+ // generate context
+ taskContext, err := m.taskManager.BuildContextFromContinuous(profilingProcesses, taskSetter, func() (string, error) {
+ report := &v3.ContinuousProfilingReport{
+ Layer: process.Entity().Layer,
+ ServiceName: process.Entity().ServiceName,
+ InstanceName: process.Entity().InstanceName,
+ ProcessName: process.Entity().ProcessName,
+ Causes: transferCauses,
+ }
+ reportSetter(report)
+ profilingTask, err := m.continuousClient.ReportProfilingTask(m.ctx, report)
+ if err != nil {
+ return "", err
+ }
+
+ command := profilingTask.Commands[0]
+ if len(profilingTask.Commands) != 1 || command.GetCommand() != "ContinuousProfilingReportTask" {
+ return "", fmt.Errorf("the profiling task result is not right, command count: %d", len(profilingTask.Commands))
+ }
+ for _, kv := range command.GetArgs() {
+ if kv.GetKey() == "TaskId" {
+ return kv.GetValue(), nil
+ }
+ }
+ return "", fmt.Errorf("could not found the task ID from repoter")
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ // execute task from context
+ m.taskManager.StartTask(taskContext)
+
+ return taskContext, nil
+}
diff --git a/pkg/profiling/manager.go b/pkg/profiling/manager.go
index 1530a4a..d641811 100644
--- a/pkg/profiling/manager.go
+++ b/pkg/profiling/manager.go
@@ -22,32 +22,25 @@ import (
"fmt"
"time"
- "github.com/apache/skywalking-rover/pkg/core"
+ "github.com/apache/skywalking-rover/pkg/profiling/continuous"
+
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/profiling/task"
-
- v3 "skywalking.apache.org/repo/goapi/collect/ebpf/profiling/v3"
)
// Manager the profiling task, receive them from the backend side
type Manager struct {
- profilingClient v3.EBPFProfilingServiceClient
- interval time.Duration
- taskManager *task.Manager
+ checkInterval time.Duration
+ flushInterval time.Duration
+ taskManager *task.Manager
+ continuousManager *continuous.Manager
ctx context.Context
cancel context.CancelFunc
-
- instanceID string
- lastUpdateTime int64
}
func NewManager(ctx context.Context, manager *module.Manager, conf *Config) (*Manager, error) {
- coreOperator := manager.FindModule(core.ModuleName).(core.Operator)
- connection := coreOperator.BackendOperator().GetConnection()
- profilingClient := v3.NewEBPFProfilingServiceClient(connection)
- instanceID := coreOperator.InstanceID()
- duration, err := time.ParseDuration(conf.CheckInterval)
+ checkDuration, err := time.ParseDuration(conf.CheckInterval)
if err != nil {
return nil, fmt.Errorf("parse profling check interval failure: %v", err)
}
@@ -57,98 +50,60 @@ func NewManager(ctx context.Context, manager *module.Manager, conf *Config) (*Ma
return nil, fmt.Errorf("parse profiling data flush interval failure: %v", err)
}
- taskManager, err := task.NewManager(ctx, manager, profilingClient, flushDuration, conf.TaskConfig)
+ taskManager, err := task.NewManager(ctx, manager, conf.TaskConfig)
+ if err != nil {
+ return nil, err
+ }
+
+ continuousManager, err := continuous.NewManager(ctx, taskManager, manager, conf.ContinuousConfig)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
return &Manager{
- profilingClient: profilingClient,
- taskManager: taskManager,
- interval: duration,
- ctx: ctx,
- cancel: cancel,
- instanceID: instanceID,
- lastUpdateTime: -1,
+ checkInterval: checkDuration,
+ flushInterval: flushDuration,
+ taskManager: taskManager,
+ continuousManager: continuousManager,
+ ctx: ctx,
+ cancel: cancel,
}, nil
}
func (m *Manager) Start() {
m.taskManager.Start()
+ m.continuousManager.Start()
go func() {
- timeTicker := time.NewTicker(m.interval)
+ checkTicker := time.NewTicker(m.checkInterval)
+ flushTicker := time.NewTicker(m.flushInterval)
for {
select {
- case <-timeTicker.C:
- if err := m.startingWatchTask(); err != nil {
- log.Errorf("fetch profiling task failure: %v", err)
- }
+ case <-checkTicker.C:
+ m.logErrorIfContains(m.taskManager.StartingWatchTask(), "check profiling task")
+ m.logErrorIfContains(m.continuousManager.CheckPolicies(), "check profiling policies")
+ case <-flushTicker.C:
+ m.logErrorIfContains(m.taskManager.FlushProfilingData(), "flush profiling task")
case <-m.ctx.Done():
- timeTicker.Stop()
+ checkTicker.Stop()
return
}
}
}()
}
-func (m *Manager) startingWatchTask() error {
- // query task
- tasks, err := m.profilingClient.QueryTasks(m.ctx, &v3.EBPFProfilingTaskQuery{
- RoverInstanceId: m.instanceID,
- LatestUpdateTime: m.lastUpdateTime,
- })
+func (m *Manager) logErrorIfContains(err error, t string) {
if err != nil {
- return err
+ log.Warnf("%s failure: %v", t, err)
}
- if len(tasks.Commands) == 0 {
- return nil
- }
-
- // analyze profiling tasks
- taskContexts := make([]*task.Context, 0)
- lastUpdateTime := m.lastUpdateTime
- for _, cmd := range tasks.Commands {
- taskContext, err := m.taskManager.BuildContext(cmd)
- if err != nil {
- log.Warnf("could not execute task, ignored. %v", err)
- continue
- }
-
- if taskContext.UpdateTime() > lastUpdateTime {
- lastUpdateTime = taskContext.UpdateTime()
- }
-
- if !taskContext.CheckTaskRunnable() {
- continue
- }
-
- taskContexts = append(taskContexts, taskContext)
- }
-
- // update last task time
- m.lastUpdateTime = lastUpdateTime
-
- if len(taskContexts) == 0 {
- return nil
- }
-
- taskIDList := make([]string, len(taskContexts))
- for inx, c := range taskContexts {
- taskIDList[inx] = c.TaskID()
- }
- log.Infof("received %d profiling task: %v", len(taskContexts), taskIDList)
-
- // start tasks
- for _, t := range taskContexts {
- m.taskManager.StartTask(t)
- }
- return nil
}
func (m *Manager) Shutdown() error {
if err := m.taskManager.Shutdown(); err != nil {
log.Warnf("task manager shutdown failure: %v", err)
}
+ if err := m.continuousManager.Shutdown(); err != nil {
+ log.Warnf("continuous profiling manager shutdown failure: %v", err)
+ }
m.cancel()
return nil
}
diff --git a/pkg/profiling/task/base/task.go b/pkg/profiling/task/base/task.go
index dc12577..2748924 100644
--- a/pkg/profiling/task/base/task.go
+++ b/pkg/profiling/task/base/task.go
@@ -24,6 +24,8 @@ import (
"strings"
"time"
+ "github.com/apache/skywalking-rover/pkg/process/api"
+
v3 "skywalking.apache.org/repo/goapi/collect/common/v3"
)
@@ -86,6 +88,22 @@ func ProfilingTaskFromCommand(command *v3.Command) (*ProfilingTask, error) {
return task, nil
}
+func ProfilingTaskFromContinuous(processes []api.ProcessInterface, taskSetter func(task *ProfilingTask)) *ProfilingTask {
+ processesIDList := make([]string, 0)
+ for _, p := range processes {
+ processesIDList = append(processesIDList, p.ID())
+ }
+ task := &ProfilingTask{
+ ProcessIDList: processesIDList,
+ UpdateTime: 0,
+ StartTime: 0,
+ TriggerType: TriggerTypeFixedTime,
+ }
+ taskSetter(task)
+
+ return task
+}
+
type ExtensionConfig struct {
NetworkSamplings []*NetworkSamplingRule `json:"NetworkSamplings"`
}
diff --git a/pkg/profiling/task/context.go b/pkg/profiling/task/context.go
index 9857041..1dd0908 100644
--- a/pkg/profiling/task/context.go
+++ b/pkg/profiling/task/context.go
@@ -93,3 +93,11 @@ func (c *Context) CheckTaskRunnable() bool {
}
return true
}
+
+func (c *Context) IsRunning() bool {
+ return c.status == Running
+}
+
+func (c *Context) RunningTime() time.Time {
+ return c.startRunningTime
+}
diff --git a/pkg/profiling/task/manager.go b/pkg/profiling/task/manager.go
index 23752de..751aa38 100644
--- a/pkg/profiling/task/manager.go
+++ b/pkg/profiling/task/manager.go
@@ -23,6 +23,8 @@ import (
"sync"
"time"
+ "github.com/apache/skywalking-rover/pkg/core"
+
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/module"
"github.com/apache/skywalking-rover/pkg/process"
@@ -39,16 +41,19 @@ type Manager struct {
moduleMgr *module.Manager
processOperator process.Operator
profilingClient profiling_v3.EBPFProfilingServiceClient
- flushInterval time.Duration
ctx context.Context
cancel context.CancelFunc
taskConfig *base.TaskConfig
- tasks map[string]*Context
+ tasks map[string]*Context
+ instanceID string
+ lastUpdateTime int64
}
-func NewManager(ctx context.Context, moduleMgr *module.Manager,
- profilingClient profiling_v3.EBPFProfilingServiceClient, flushInterval time.Duration, taskConfig *base.TaskConfig) (*Manager, error) {
+func NewManager(ctx context.Context, moduleMgr *module.Manager, taskConfig *base.TaskConfig) (*Manager, error) {
+ coreOperator := moduleMgr.FindModule(core.ModuleName).(core.Operator)
+ connection := coreOperator.BackendOperator().GetConnection()
+ profilingClient := profiling_v3.NewEBPFProfilingServiceClient(connection)
processOperator := moduleMgr.FindModule(process.ModuleName).(process.Operator)
if err := CheckProfilingTaskConfig(taskConfig, moduleMgr); err != nil {
return nil, err
@@ -61,7 +66,7 @@ func NewManager(ctx context.Context, moduleMgr *module.Manager,
profilingClient: profilingClient,
taskConfig: taskConfig,
tasks: make(map[string]*Context),
- flushInterval: flushInterval,
+ instanceID: coreOperator.InstanceID(),
ctx: ctx,
cancel: cancel,
}
@@ -69,10 +74,9 @@ func NewManager(ctx context.Context, moduleMgr *module.Manager,
}
func (m *Manager) Start() {
- go m.startFlushProfilingData()
}
-func (m *Manager) BuildContext(command *common_v3.Command) (*Context, error) {
+func (m *Manager) BuildContextFromCommand(command *common_v3.Command) (*Context, error) {
// analyze command
t, err := base.ProfilingTaskFromCommand(command)
if err != nil || t == nil {
@@ -99,18 +103,52 @@ func (m *Manager) BuildContext(command *common_v3.Command) (*Context, error) {
}
// init runner
- var r base.ProfileTaskRunner
- if runner, err := NewProfilingRunner(t.TargetType, m.taskConfig, m.moduleMgr); err != nil {
+ if err := m.initTaskContext(taskContext); err != nil {
+ return nil, err
+ }
+ return taskContext, nil
+}
+
+func (m *Manager) BuildContextFromContinuous(processes []api.ProcessInterface,
+ taskSetter func(task *base.ProfilingTask), taskIDGenerator func() (string, error)) (*Context, error) {
+ task := base.ProfilingTaskFromContinuous(processes, taskSetter)
+
+ taskContext := &Context{task: task, processes: processes, status: NotRunning, recalcDuration: make(chan bool, 1)}
+ // if the task already exist, then return error
+ existTask := m.tasks[taskContext.BuildTaskIdentity()]
+ // if task are same, then just rewrite the task information and return
+ if existTask != nil && existTask.IsSameTask(taskContext) {
+ return nil, fmt.Errorf("already exist profiling task, so ignore")
+ }
+
+ taskID, err := taskIDGenerator()
+ if err != nil {
return nil, err
- } else if err := runner.Init(t, processes); err != nil {
- return nil, fmt.Errorf("could not init %s runner for task: %s: %v", t.TriggerType, t.TaskID, err)
+ }
+ task.TaskID = taskID
+
+ // init runner
+ if err := m.initTaskContext(taskContext); err != nil {
+ return nil, err
+ }
+ return taskContext, nil
+}
+
+func (m *Manager) initTaskContext(taskContext *Context) error {
+ // init runner
+ var r base.ProfileTaskRunner
+ task := taskContext.task
+ if runner, err := NewProfilingRunner(task.TargetType, m.taskConfig, m.moduleMgr); err != nil {
+ return err
+ } else if err := runner.Init(task, taskContext.processes); err != nil {
+ return fmt.Errorf("could not init %s runner for task: %s: %v", task.TriggerType, task.TaskID, err)
} else {
r = runner
}
taskContext.runner = r
taskContext.ctx, taskContext.cancel = context.WithCancel(m.ctx)
- return taskContext, nil
+ return nil
}
func (m *Manager) StartTask(c *Context) {
@@ -246,24 +284,63 @@ func (m *Manager) checkStoppedTaskAndRemoved() {
}
}
-func (m *Manager) startFlushProfilingData() {
- timeTicker := time.NewTicker(m.flushInterval)
- for {
- select {
- case <-timeTicker.C:
- if err := m.flushProfilingData(); err != nil {
- log.Warnf("flush profiling data failure: %v", err)
- }
- // cleanup the stopped after flush profiling data to make sure all the profiling data been sent
- m.checkStoppedTaskAndRemoved()
- case <-m.ctx.Done():
- timeTicker.Stop()
- return
+func (m *Manager) StartingWatchTask() error {
+ // query task
+ tasks, err := m.profilingClient.QueryTasks(m.ctx, &profiling_v3.EBPFProfilingTaskQuery{
+ RoverInstanceId: m.instanceID,
+ LatestUpdateTime: m.lastUpdateTime,
+ })
+ if err != nil {
+ return err
+ }
+ if len(tasks.Commands) == 0 {
+ return nil
+ }
+
+ // analyze profiling tasks
+ taskContexts := make([]*Context, 0)
+ lastUpdateTime := m.lastUpdateTime
+ for _, cmd := range tasks.Commands {
+ taskContext, err := m.BuildContextFromCommand(cmd)
+ if err != nil {
+ log.Warnf("could not execute task, ignored. %v", err)
+ continue
+ }
+
+ if taskContext.UpdateTime() > lastUpdateTime {
+ lastUpdateTime = taskContext.UpdateTime()
}
+
+ if !taskContext.CheckTaskRunnable() {
+ continue
+ }
+
+ taskContexts = append(taskContexts, taskContext)
}
+
+ // update last task time
+ m.lastUpdateTime = lastUpdateTime
+
+ if len(taskContexts) == 0 {
+ return nil
+ }
+
+ taskIDList := make([]string, len(taskContexts))
+ for inx, c := range taskContexts {
+ taskIDList[inx] = c.TaskID()
+ }
+ log.Infof("received %d profiling task: %v", len(taskContexts), taskIDList)
+
+ // start tasks
+ for _, t := range taskContexts {
+ m.StartTask(t)
+ }
+ return nil
}
-func (m *Manager) flushProfilingData() error {
+func (m *Manager) FlushProfilingData() error {
+ // cleanup the stopped after flush profiling data to make sure all the profiling data been sent
+ defer m.checkStoppedTaskAndRemoved()
if len(m.tasks) == 0 {
return nil
}
diff --git a/pkg/profiling/task/network/bpf/bpf.go b/pkg/profiling/task/network/bpf/bpf.go
index 4544176..d4dc7fa 100644
--- a/pkg/profiling/task/network/bpf/bpf.go
+++ b/pkg/profiling/task/network/bpf/bpf.go
@@ -28,7 +28,7 @@ import (
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -target bpfel -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf $REPO_ROOT/bpf/profiling/network/netmonitor.c -- -I$REPO_ROOT/bpf/include -D__TARGET_ARCH_x86
type Loader struct {
- *Linker
+ *btf.Linker
*bpfObjects
}
@@ -40,7 +40,7 @@ func NewLoader() (*Loader, error) {
return &Loader{
bpfObjects: &objs,
- Linker: NewLinker(),
+ Linker: btf.NewLinker(),
}, nil
}
diff --git a/pkg/profiling/task/network/ssl.go b/pkg/profiling/task/network/ssl.go
index 82c9878..5591d53 100644
--- a/pkg/profiling/task/network/ssl.go
+++ b/pkg/profiling/task/network/ssl.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
"github.com/apache/skywalking-rover/pkg/tools"
+ "github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/elf"
"github.com/apache/skywalking-rover/pkg/tools/host"
"github.com/apache/skywalking-rover/pkg/tools/path"
@@ -303,9 +304,9 @@ var nodeTLSAddrWithVersions = []struct {
var nodeTLSProbeWithVersions = []struct {
v *version.Version
- f func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
+ f func(uprobe *btf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
}{
- {version.Build(10, 19, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
+ {version.Build(10, 19, 0), func(uprobe *btf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrapC2E"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap7ClearInE"),
@@ -313,7 +314,7 @@ var nodeTLSProbeWithVersions = []struct {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node7TLSWrap8ClearOutE"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
}},
- {version.Build(15, 0, 0), func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
+ {version.Build(15, 0, 0), func(uprobe *btf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module) {
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrapC2E"),
bpf.NodeTlsWrap, bpf.NodeTlsWrapRet)
uprobe.AddLinkWithSymbols(searchSymbolNames([]*profiling.Module{nodeModule}, strings.HasPrefix, "_ZN4node6crypto7TLSWrap7ClearInE"),
@@ -347,7 +348,7 @@ func findNodeTLSAddrConfig(v *version.Version) (*NodeTLSAddrInBPF, error) {
}
func registerNodeTLSProbes(v *version.Version, loader *bpf.Loader, nodeModule, libSSLModule *profiling.Module) error {
- var probeFunc func(uprobe *bpf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
+ var probeFunc func(uprobe *btf.UProbeExeFile, bpf *bpf.Loader, nodeModule *profiling.Module)
for _, c := range nodeTLSProbeWithVersions {
if v.GreaterOrEquals(c.v) {
probeFunc = c.f
diff --git a/pkg/profiling/task/oncpu/runner.go b/pkg/profiling/task/oncpu/runner.go
index b099c94..b346372 100644
--- a/pkg/profiling/task/oncpu/runner.go
+++ b/pkg/profiling/task/oncpu/runner.go
@@ -209,6 +209,9 @@ func (r *Runner) Stop() error {
}
func (r *Runner) FlushData() ([]*v3.EBPFProfilingData, error) {
+ if r.bpf == nil {
+ return nil, nil
+ }
var stack Event
var counter uint32
iterate := r.bpf.Counts.Iterate()
diff --git a/pkg/profiling/task/network/bpf/linker.go b/pkg/tools/btf/linker.go
similarity index 98%
rename from pkg/profiling/task/network/bpf/linker.go
rename to pkg/tools/btf/linker.go
index 9ff943f..8c80726 100644
--- a/pkg/profiling/task/network/bpf/linker.go
+++ b/pkg/tools/btf/linker.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package bpf
+package btf
import (
"bytes"
@@ -28,7 +28,6 @@ import (
"golang.org/x/arch/x86/x86asm"
- "github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/elf"
@@ -39,8 +38,6 @@ import (
"github.com/hashicorp/go-multierror"
)
-var log = logger.GetLogger("profiling", "task", "network", "bpf")
-
const defaultSymbolPrefix = "sys_"
type LinkFunc func(symbol string, prog *ebpf.Program, opts *link.KprobeOptions) (link.Link, error)
diff --git a/test/e2e/cases/profiling/network/base/rover_configs.yaml b/test/e2e/cases/profiling/network/base/rover_configs.yaml
index 4d94725..4bbf8cb 100644
--- a/test/e2e/cases/profiling/network/base/rover_configs.yaml
+++ b/test/e2e/cases/profiling/network/base/rover_configs.yaml
@@ -152,4 +152,17 @@ profiling:
# The default body encoding when sampling the request
default_request_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_REQUEST_ENCODING:UTF-8}
# The default body encoding when sampling the response
- default_response_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING:UTF-8}
\ No newline at end of file
+ default_response_encoding: ${ROVER_PROFILING_TASK_NETWORK_PROTOCOL_ANALYZE_SAMPLING_HTTP_DEFAULT_RESPONSE_ENCODING:UTF-8}
+ # continuous profiling config
+ continuous:
+ # continuous related meters prefix name
+ meter_prefix: ${ROVER_PROFILING_CONTINUOUS_METER_PREFIX:rover_con_p}
+ # The interval of fetch metrics from the system, such as Process CPU, System Load, etc.
+ fetch_interval: ${ROVER_PROFILING_CONTINUOUS_FETCH_INTERVAL:1s}
+ # The interval of check metrics is reach the thresholds
+ check_interval: ${ROVER_PROFILING_CONTINUOUS_CHECK_INTERVAL:5s}
+ trigger:
+ # the duration of the profiling task
+ execute_duration: ${ROVER_PROFILING_CONTINUOUS_TRIGGER_EXECUTE_DURATION:10m}
+ # the minimal duration between the execution of the same profiling task
+ silence_duration: ${ROVER_PROFILING_CONTINUOUS_TRIGGER_SILENCE_DURATION:20m}
\ No newline at end of file
diff --git a/test/e2e/cases/profiling/network/envoy/kind.yaml b/test/e2e/cases/profiling/network/envoy/kind.yaml
index f9ba27e..a8de4d2 100644
--- a/test/e2e/cases/profiling/network/envoy/kind.yaml
+++ b/test/e2e/cases/profiling/network/envoy/kind.yaml
@@ -20,19 +20,23 @@ apiVersion: kind.x-k8s.io/v1alpha4
nodes:
# the control plane node config
- role: control-plane
+ image: kindest/node:v1.20.15@sha256:6f2d011dffe182bad80b85f6c00e8ca9d86b5b8922cdf433d53575c4c5212248
extraMounts:
- hostPath: /
containerPath: /host
# the three workers
- role: worker
+ image: kindest/node:v1.20.15@sha256:6f2d011dffe182bad80b85f6c00e8ca9d86b5b8922cdf433d53575c4c5212248
extraMounts:
- hostPath: /
containerPath: /host
- role: worker
+ image: kindest/node:v1.20.15@sha256:6f2d011dffe182bad80b85f6c00e8ca9d86b5b8922cdf433d53575c4c5212248
extraMounts:
- hostPath: /
containerPath: /host
- role: worker
+ image: kindest/node:v1.20.15@sha256:6f2d011dffe182bad80b85f6c00e8ca9d86b5b8922cdf433d53575c4c5212248
extraMounts:
- hostPath: /
containerPath: /host