You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2022/10/08 11:32:21 UTC
[skywalking-rover] branch main updated: Simplify protocol analyze (#53)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 0180fbb Simplify protocol analyze (#53)
0180fbb is described below
commit 0180fbbd3383a142feb55f220b5c1f1086896aa8
Author: mrproliu <74...@qq.com>
AuthorDate: Sat Oct 8 19:32:15 2022 +0800
Simplify protocol analyze (#53)
---
.licenserc.yaml | 1 -
CHANGES.md | 1 +
LICENSE | 2 -
bpf/profiling/network/go_tls.c | 4 +-
bpf/profiling/network/netmonitor.c | 18 +-
bpf/profiling/network/protocol_analyze.h | 797 ------------------------------
bpf/profiling/network/protocol_analyzer.h | 168 +++++++
bpf/profiling/network/sock_stats.h | 4 -
pkg/profiling/task/network/context.go | 8 +-
pkg/profiling/task/network/enums.go | 27 -
10 files changed, 183 insertions(+), 847 deletions(-)
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 61e52ca..74481e9 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -31,7 +31,6 @@ header:
- 'NOTICE'
- '.gitignore'
- 'dist/LICENSE.tpl'
- - 'bpf/profiling/network/protocol_analyze.h'
comment: on-failure
diff --git a/CHANGES.md b/CHANGES.md
index 33553b5..a19d9e1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -6,6 +6,7 @@ Release Notes.
------------------
#### Features
* Enhancing the render context for the Kubernetes process.
+* Simplify the logic of network protocol analysis.
#### Bug Fixes
diff --git a/LICENSE b/LICENSE
index ac84ef0..ef2b2c6 100644
--- a/LICENSE
+++ b/LICENSE
@@ -214,5 +214,3 @@ Apache 2.0 licenses
The following components are provided under the Apache License. See project link for details.
The text of each license is the standard Apache 2.0 license.
-
- bpf/profiling/network/protocol_analyze.h from pixie-io/pixie: https://github.com/pixie-io Apache-2.0
\ No newline at end of file
diff --git a/bpf/profiling/network/go_tls.c b/bpf/profiling/network/go_tls.c
index 6422ad2..8a83aa5 100644
--- a/bpf/profiling/network/go_tls.c
+++ b/bpf/profiling/network/go_tls.c
@@ -47,8 +47,8 @@ int go_casgstatus(struct pt_regs* ctx) {
__u32 status;
assign_go_tls_arg(&status, sizeof(status), symaddrs->casg_status_new_val_loc, sp, regs);
- const int runningStatus = 2;
- if (status == runningStatus) {
+ // check the status is running
+ if (status == 2) {
set_goid(id, goid);
}
return 0;
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index e262566..62a9ba9 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -35,7 +35,7 @@
#include "socket.h"
#include "sock_stats.h"
#include "args.h"
-#include "protocol_analyze.h"
+#include "protocol_analyzer.h"
char __license[] SEC("license") = "Dual MIT/GPL";
@@ -65,6 +65,8 @@ static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
}
+#define BPF_PROBE_READ_VAR(value, ptr) bpf_probe_read(&value, sizeof(value), ptr)
+
static __always_inline void submit_new_connection(struct pt_regs* ctx, __u32 func_name, __u32 tgid, __u32 fd, __u64 start_nacs,
struct sockaddr* addr, const struct socket* socket) {
__u64 curr_nacs = bpf_ktime_get_ns();
@@ -343,17 +345,17 @@ static __always_inline void process_write_data(struct pt_regs *ctx, __u64 id, st
}
if (buf_reader != NULL) {
- enum message_type_t msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, conn);
+ __u32 msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, conn);
// if send request data to remote address or receive response data from remote address
// then, recognized current connection is client
- if ((msg_type == kRequest && data_direction == SOCK_DATA_DIRECTION_EGRESS) ||
- (msg_type == kResponse && data_direction == SOCK_DATA_DIRECTION_INGRESS)) {
+ if ((msg_type == CONNECTION_MESSAGE_TYPE_REQUEST && data_direction == SOCK_DATA_DIRECTION_EGRESS) ||
+ (msg_type == CONNECTION_MESSAGE_TYPE_RESPONSE && data_direction == SOCK_DATA_DIRECTION_INGRESS)) {
conn->role = CONNECTION_ROLE_TYPE_CLIENT;
// if send response data to remote address or receive request data from remote address
// then, recognized current connection is server
- } else if ((msg_type == kResponse && data_direction == SOCK_DATA_DIRECTION_EGRESS) ||
- (msg_type == kRequest && data_direction == SOCK_DATA_DIRECTION_INGRESS)) {
+ } else if ((msg_type == CONNECTION_MESSAGE_TYPE_RESPONSE && data_direction == SOCK_DATA_DIRECTION_EGRESS) ||
+ (msg_type == CONNECTION_MESSAGE_TYPE_REQUEST && data_direction == SOCK_DATA_DIRECTION_INGRESS)) {
conn->role = CONNECTION_ROLE_TYPE_SERVER;
}
}
@@ -728,7 +730,7 @@ int sys_sendmmsg_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
__u32 bytes_count;
- BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
+ BPF_PROBE_READ_VAR(bytes_count, data_args->msg_len);
process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_EGRESS, true, SOCKET_OPTS_TYPE_SENDMMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
@@ -1017,7 +1019,7 @@ int sys_recvmmsg_ret(struct pt_regs* ctx) {
struct sock_data_args_t *data_args = bpf_map_lookup_elem(&socket_data_args, &id);
if (data_args) {
__u32 bytes_count;
- BPF_PROBE_READ_VAR1(bytes_count, data_args->msg_len);
+ BPF_PROBE_READ_VAR(bytes_count, data_args->msg_len);
process_write_data(ctx, id, data_args, bytes_count, SOCK_DATA_DIRECTION_INGRESS, true, SOCKET_OPTS_TYPE_RECVMMSG, false);
}
bpf_map_delete_elem(&socket_data_args, &id);
diff --git a/bpf/profiling/network/protocol_analyze.h b/bpf/profiling/network/protocol_analyze.h
deleted file mode 100644
index 6f7e2c2..0000000
--- a/bpf/profiling/network/protocol_analyze.h
+++ /dev/null
@@ -1,797 +0,0 @@
-/*
- * This code runs using bpf in the Linux kernel.
- * Copyright 2018- The Pixie Authors.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
- *
- * SPDX-License-Identifier: GPL-2.0
- */
-
-#include "common.h"
-
-#pragma once
-
-enum traffic_protocol_t {
- kProtocolUnknown = 0,
- kProtocolHTTP = 1,
- kProtocolHTTP2 = 2,
- kProtocolMySQL = 3,
- kProtocolCQL = 4,
- kProtocolPGSQL = 5,
- kProtocolDNS = 6,
- kProtocolRedis = 7,
- kProtocolNATS = 8,
- kProtocolMongo = 9,
- kProtocolKafka = 10,
- kProtocolMux = 11,
-};
-
-enum message_type_t { kUnknown, kRequest, kResponse };
-
-struct protocol_message_t {
- __u32 protocol;
- __u32 type;
-};
-
-#define BPF_PROBE_READ_VAR1(value, ptr) bpf_probe_read(&value, sizeof(value), ptr)
-
-static __inline int32_t read_big_endian_int32(const char* buf) {
- int32_t length;
- BPF_PROBE_READ_VAR1(length, buf);
- return bpf_ntohl(length);
-}
-
-static __inline int32_t read_big_endian_int16(const char* buf) {
- int16_t val;
- BPF_PROBE_READ_VAR1(val, buf);
- return bpf_ntohl(val);
-}
-
-static __inline int px_bpf_strncmp(const char* lhs, size_t n, const char* rhs) {
- for (size_t i = 0; i < n; ++i) {
- if (lhs[i] != rhs[i]) {
- return 1;
- }
- }
- return 0;
-}
-
-static __inline __u32 infer_http_message(const char* buf, size_t count) {
- // Smallest HTTP response is 17 characters:
- // HTTP/1.1 200 OK\r\n
- // Smallest HTTP response is 16 characters:
- // GET x HTTP/1.1\r\n
- if (count < 16) {
- return kUnknown;
- }
-
- if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P') {
- return kResponse;
- }
- if (buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T') {
- return kRequest;
- }
- if (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') {
- return kRequest;
- }
- if (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') {
- return kRequest;
- }
- if (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T') {
- return kRequest;
- }
- if (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E' && buf[4] == 'T' && buf[5] == 'E') {
- return kRequest;
- }
- if (buf[0] == 'C' && buf[1] == 'O' && buf[2] == 'N' && buf[3] == 'N' && buf[4] == 'E' && buf[5] == 'T') {
- return kRequest;
- }
- if (buf[0] == 'O' && buf[1] == 'P' && buf[2] == 'T' && buf[3] == 'I' && buf[4] == 'O' && buf[5] == 'N') {
- return kRequest;
- }
- if (buf[0] == 'T' && buf[1] == 'R' && buf[2] == 'A' && buf[3] == 'C' && buf[4] == 'E') {
- return kRequest;
- }
- if (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C' && buf[4] == 'H') {
- return kRequest;
- }
-
- return kUnknown;
-}
-
-// frame format: https://www.rfc-editor.org/rfc/rfc7540.html#section-4.1
-static __inline __u32 infer_http2_message(const char* buf_src, size_t count) {
-static const uint8_t kFrameBasicSize = 0x9; // including Length, Type, Flags, Reserved, Stream Identity
-static const uint8_t kFrameTypeHeader = 0x1; // the type of the frame: https://www.rfc-editor.org/rfc/rfc7540.html#section-6.2
-static const uint8_t kFrameLoopCount = 5;
-
-static const uint8_t kStaticTableMaxSize = 61;// https://www.rfc-editor.org/rfc/rfc7541#appendix-A
-static const uint8_t kStaticTableAuth = 1;
-static const uint8_t kStaticTableGet = 2;
-static const uint8_t kStaticTablePost = 3;
-static const uint8_t kStaticTablePath1 = 4;
-static const uint8_t kStaticTablePath2 = 5;
-
- // the buffer size must bigger than basic frame size
- if (count < kFrameBasicSize) {
- return kUnknown;
- }
-
- // frame info
- __u8 frame[21] = { 0 };
- __u32 frameOffset = 0;
- // header info
- __u8 staticInx, headerBlockFragmentOffset;
-
- // each all frame
-#pragma unroll
- for (__u8 i = 0; i < kFrameLoopCount; i++) {
- if (frameOffset >= count) {
- break;
- }
-
- // read frame
- bpf_probe_read(frame, sizeof(frame), buf_src + frameOffset);
- frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;
-
- // is header frame
- if (frame[3] != kFrameTypeHeader) {
- continue;
- }
-
- // validate the header(unset): not HTTP2 protocol
- // this frame must is a send request
- if ((frame[4] & 0xd2) || frame[5] & 0x01) {
- return kUnknown;
- }
-
- // locate the header block fragment offset
- headerBlockFragmentOffset = kFrameBasicSize;
- if (frame[4] & 0x20) { // PADDED flag is set
- headerBlockFragmentOffset += 1;
- }
- if (frame[4] & 0x20) { // PRIORITY flag is set
- headerBlockFragmentOffset += 5;
- }
-
-#pragma unroll
- for (__u8 j = 0; j <= kStaticTablePath2; j++) {
- if (headerBlockFragmentOffset > count) {
- return kUnknown;
- }
- staticInx = frame[headerBlockFragmentOffset] & 0x7f;
- if (staticInx <= kStaticTableMaxSize && staticInx > 0) {
- if (staticInx == kStaticTableAuth ||
- staticInx == kStaticTableGet ||
- staticInx == kStaticTablePost ||
- staticInx == kStaticTablePath1 ||
- staticInx == kStaticTablePath2) {
- return kRequest;
- } else {
- return kResponse;
- }
- }
- headerBlockFragmentOffset++;
- }
- }
-
- return kUnknown;
-}
-
-// Cassandra frame:
-// 0 8 16 24 32 40
-// +---------+---------+---------+---------+---------+
-// | version | flags | stream | opcode |
-// +---------+---------+---------+---------+---------+
-// | length |
-// +---------+---------+---------+---------+
-// | |
-// . ... body ... .
-// . .
-// . .
-// +----------------------------------------
-static __inline enum message_type_t infer_cql_message(const char* buf, size_t count) {
-static const uint8_t kError = 0x00;
-static const uint8_t kStartup = 0x01;
-static const uint8_t kReady = 0x02;
-static const uint8_t kAuthenticate = 0x03;
-static const uint8_t kOptions = 0x05;
-static const uint8_t kSupported = 0x06;
-static const uint8_t kQuery = 0x07;
-static const uint8_t kResult = 0x08;
-static const uint8_t kPrepare = 0x09;
-static const uint8_t kExecute = 0x0a;
-static const uint8_t kRegister = 0x0b;
-static const uint8_t kEvent = 0x0c;
-static const uint8_t kBatch = 0x0d;
-static const uint8_t kAuthChallenge = 0x0e;
-static const uint8_t kAuthResponse = 0x0f;
-static const uint8_t kAuthSuccess = 0x10;
-
-// Cassandra frames have a 9-byte header.
-if (count < 9) {
-return kUnknown;
-}
-
-// Version contains both version and direction.
-bool request = (buf[0] & 0x80) == 0x00;
-uint8_t version = (buf[0] & 0x7f);
-uint8_t flags = buf[1];
-uint8_t opcode = buf[4];
-int32_t length = read_big_endian_int32(&buf[5]);
-
-// Cassandra version should 5 or less. Also v2 and lower seem much less popular.
-// For example ScyllaDB only supports v3+.
-if (version < 3 || version > 5) {
-return kUnknown;
-}
-
-// Only flags 0x1, 0x2, 0x4 and 0x8 are used.
-if ((flags & 0xf0) != 0) {
-return kUnknown;
-}
-
-// A frame is limited to 256MB in length,
-// but we look for more common frames which should be much smaller in size.
-if (length > 10000) {
-return kUnknown;
-}
-
-switch (opcode) {
-case kStartup:
-case kOptions:
-case kQuery:
-case kPrepare:
-case kExecute:
-case kRegister:
-case kBatch:
-case kAuthResponse:
- return request ? kRequest : kUnknown;
-case kError:
-case kReady:
-case kAuthenticate:
-case kSupported:
-case kResult:
-case kEvent:
-case kAuthChallenge:
-case kAuthSuccess:
- return !request ? kResponse : kUnknown;
-default:
- return kUnknown;
-}
-}
-
-static __inline enum message_type_t infer_mongo_message(const char* buf, size_t count) {
-// Reference:
-// https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#std-label-wp-request-opcodes.
-// Note: Response side inference for Mongo is not robust, and is not attempted to avoid
-// confusion with other protocols, especially MySQL.
-static const int32_t kOPUpdate = 2001;
-static const int32_t kOPInsert = 2002;
-static const int32_t kReserved = 2003;
-static const int32_t kOPQuery = 2004;
-static const int32_t kOPGetMore = 2005;
-static const int32_t kOPDelete = 2006;
-static const int32_t kOPKillCursors = 2007;
-static const int32_t kOPCompressed = 2012;
-static const int32_t kOPMsg = 2013;
-
-static const int32_t kMongoHeaderLength = 16;
-
-if (count < kMongoHeaderLength) {
-return kUnknown;
-}
-
-int32_t* buf4 = (int32_t*)buf;
-int32_t message_length = buf4[0];
-
-if (message_length < kMongoHeaderLength) {
-return kUnknown;
-}
-
-int32_t request_id = buf4[1];
-
-if (request_id < 0) {
-return kUnknown;
-}
-
-int32_t response_to = buf4[2];
-int32_t opcode = buf4[3];
-
-if (opcode == kOPUpdate || opcode == kOPInsert || opcode == kReserved || opcode == kOPQuery ||
- opcode == kOPGetMore || opcode == kOPDelete || opcode == kOPKillCursors ||
- opcode == kOPCompressed || opcode == kOPMsg) {
-if (response_to == 0) {
- return kRequest;
-}
-}
-
-return kUnknown;
-}
-
-// TODO(yzhao): This is for initial development use. Later we need to combine with more inference
-// code, as the startup message only appears at the beginning of the exchanges between PostgreSQL
-// client and server.
-static __inline enum message_type_t infer_pgsql_startup_message(const char* buf, size_t count) {
-// Length field: int32, protocol version field: int32, "user" string, 4 bytes.
-const int kMinMsgLen = 4 + 4 + 4;
-if (count < kMinMsgLen) {
-return kUnknown;
-}
-
-// Assume startup message wont be larger than 10240 (10KiB).
-const int kMaxMsgLen = 10240;
-const int32_t length = read_big_endian_int32(buf);
-if (length < kMinMsgLen) {
-return kUnknown;
-}
-if (length > kMaxMsgLen) {
-return kUnknown;
-}
-
-const char kPgsqlVer30[] = "\x00\x03\x00\x00";
-if (px_bpf_strncmp((const char*)buf + 4, 4, kPgsqlVer30) != 0) {
-return kUnknown;
-}
-
-// Next we expect a key like "user", "datestyle" or "extra_float_digits".
-// For inference purposes, we simply look for a short sequence of alphabetic characters.
-for (int i = 0; i < 3; ++i) {
-// Loosely check for an alphabetic character.
-// This is a loose check and still covers some non alphabetic characters (e.g. `\`),
-// but we want to keep the BPF instruction count low.
-if (*((const char*)buf + 8 + i) < 'A') {
- return kUnknown;
-}
-}
-
-return kRequest;
-}
-
-// Regular message format: | byte tag | int32_t len | string payload |
-static __inline enum message_type_t infer_pgsql_query_message(const char* buf, size_t count) {
-const uint8_t kTagQ = 'Q';
-if (*buf != kTagQ) {
-return kUnknown;
-}
-const int32_t len = read_big_endian_int32(buf + 1);
-// The length field include the field itself of 4 bytes. Also the minimal size command is
-// COPY/MOVE. The minimal length is therefore 8.
-const int32_t kMinPayloadLen = 8;
-// Assume typical query message size is below an artificial limit.
-// 30000 is copied from postgres code base:
-// https://github.com/postgres/postgres/tree/master/src/interfaces/libpq/fe-protocol3.c#L94
-const int32_t kMaxPayloadLen = 30000;
-if (len < kMinPayloadLen || len > kMaxPayloadLen) {
-return kUnknown;
-}
-// If the input includes a whole message (1 byte tag + length), check the last character.
-if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
- count = MAX_SOCKET_BUFFER_READ_LENGTH;
-}
-if ((len + 1 <= (int)count) && (buf[count-1] != '\0')) {
-return kUnknown;
-}
-return kRequest;
-}
-
-// TODO(yzhao): ReadyForQuery message could be nice pattern to check, as it has 6 bytes of fixed bit
-// pattern, plus one byte of enum with possible values 'I', 'E', 'T'. But it's usually sent as a
-// suffix of a query response, so it's difficult to capture. Research more to see if we can detect
-// this message.
-
-static __inline enum message_type_t infer_pgsql_regular_message(const char* buf, size_t count) {
-const int kMinMsgLen = 1 + sizeof(int32_t);
-if (count < kMinMsgLen) {
-return kUnknown;
-}
-return infer_pgsql_query_message(buf, count);
-}
-
-static __inline enum message_type_t infer_pgsql_message(const char* buf, size_t count) {
-enum message_type_t type = infer_pgsql_startup_message(buf, count);
-if (type != kUnknown) {
-return type;
-}
-return infer_pgsql_regular_message(buf, count);
-}
-
-// MySQL packet:
-// 0 8 16 24 32
-// +---------+---------+---------+---------+
-// | payload_length | seq_id |
-// +---------+---------+---------+---------+
-// | |
-// . ... body ... .
-// . .
-// . .
-// +----------------------------------------
-// TODO(oazizi/yzhao): This produces too many false positives. Add stronger protocol detection.
-static __inline enum message_type_t infer_mysql_message(const char* buf, size_t count,
- struct active_connection_t* conn_info) {
-static const uint8_t kComQuery = 0x03;
-static const uint8_t kComConnect = 0x0b;
-static const uint8_t kComStmtPrepare = 0x16;
-static const uint8_t kComStmtExecute = 0x17;
-static const uint8_t kComStmtClose = 0x19;
-
-// Second statement checks whether suspected header matches the length of current packet.
-bool use_prev_buf = (conn_info->prev_count == 4) && ((size_t)read_big_endian_int32(conn_info->prev_buf) == count);
-
-if (use_prev_buf) {
-// Check the header_state to find out if the header has been read. MySQL server tends to
-// read in the 4 byte header and the rest of the packet in a separate read.
-count += 4;
-}
-
-// MySQL packets start with a 3-byte packet length and a 1-byte packet number.
-// The 5th byte on a request contains a command that tells the type.
-if (count < 5) {
-return kUnknown;
-}
-
-// Convert 3-byte length to uint32_t. But since the 4th byte is supposed to be \x00, directly
-// casting 4-bytes is correct.
-// NOLINTNEXTLINE: readability/casting
-uint32_t len = use_prev_buf ? *((uint32_t*)conn_info->prev_buf) : *((uint32_t*)buf);
-len = len & 0x00ffffff;
-
-uint8_t seq = use_prev_buf ? conn_info->prev_buf[3] : buf[3];
-uint8_t com = use_prev_buf ? buf[0] : buf[4];
-
-// The packet number of a request should always be 0.
-if (seq != 0) {
-return kUnknown;
-}
-
-// No such thing as a zero-length request in MySQL protocol.
-if (len == 0) {
-return kUnknown;
-}
-
-// Assuming that the length of a request is less than 10k characters to avoid false
-// positive flagging as MySQL, which statistically happens frequently for a single-byte
-// check.
-if (len > 10000) {
-return kUnknown;
-}
-
-// TODO(oazizi): Consider adding more commands (0x00 to 0x1f).
-// Be careful, though: trade-off is higher rates of false positives.
-if (com == kComConnect || com == kComQuery || com == kComStmtPrepare || com == kComStmtExecute ||
- com == kComStmtClose) {
-return kRequest;
-}
-return kUnknown;
-}
-
-// Reference: https://kafka.apache.org/protocol.html#protocol_messages
-// Request Header v0 => request_api_key request_api_version correlation_id
-// request_api_key => INT16
-// request_api_version => INT16
-// correlation_id => INT32
-static __inline enum message_type_t infer_kafka_request(const char* buf) {
-// API is Kafka's terminology for opcode.
-static const int kNumAPIs = 62;
-static const int kMaxAPIVersion = 12;
-
-const int16_t request_API_key = read_big_endian_int16(buf);
-if (request_API_key < 0 || request_API_key > kNumAPIs) {
-return kUnknown;
-}
-
-const int16_t request_API_version = read_big_endian_int16(buf + 2);
-if (request_API_version < 0 || request_API_version > kMaxAPIVersion) {
-return kUnknown;
-}
-
-const int32_t correlation_id = read_big_endian_int32(buf + 4);
-if (correlation_id < 0) {
-return kUnknown;
-}
-return kRequest;
-}
-
-static __inline enum message_type_t infer_kafka_message(const char* buf, size_t count,
- struct active_connection_t* conn_info) {
-// Second statement checks whether suspected header matches the length of current packet.
-// This shouldn't confuse with MySQL because MySQL uses little endian, and Kafka uses big endian.
-bool use_prev_buf =
- (conn_info->prev_count == 4) && ((size_t)read_big_endian_int32(conn_info->prev_buf) == count);
-
-if (use_prev_buf) {
-count += 4;
-}
-
-// length(4 bytes) + api_key(2 bytes) + api_version(2 bytes) + correlation_id(4 bytes)
-static const int kMinRequestLength = 12;
-if (count < kMinRequestLength) {
-return kUnknown;
-}
-
-const int32_t message_size = use_prev_buf ? count : read_big_endian_int32(buf) + 4;
-
-// Enforcing count to be exactly message_size + 4 to mitigate misclassification.
-// However, this will miss long messages broken into multiple reads.
-if (message_size < 0 || count != (size_t)message_size) {
-return kUnknown;
-}
-const char* request_buf = use_prev_buf ? buf : buf + 4;
-enum message_type_t result = infer_kafka_request(request_buf);
-
-// Kafka servers read in a 4-byte packet length header first. The first packet in the
-// stream is used to infer protocol, but the header has already been read. One solution is to
-// add another perf_submit of the 4-byte header, but this would impact the instruction limit.
-// Not handling this case causes potential confusion in the parsers. Instead, we set a
-// prepend_length_header field if and only if Kafka has just been inferred for the first time
-// under the scenario described above. Length header is appended to user the buffer in user space.
-if (use_prev_buf && result == kRequest && conn_info->protocol == kProtocolUnknown) {
-conn_info->prepend_length_header = true;
-}
-return result;
-}
-
-static __inline enum message_type_t infer_dns_message(const char* buf, size_t count) {
-const int kDNSHeaderSize = 12;
-
-// Use the maximum *guaranteed* UDP packet size as the max DNS message size.
-// UDP packets can be larger, but this is the typical maximum size for DNS.
-const int kMaxDNSMessageSize = 512;
-
-// Maximum number of resource records.
-// https://stackoverflow.com/questions/6794926/how-many-a-records-can-fit-in-a-single-dns-response
-const int kMaxNumRR = 25;
-
-if (count < kDNSHeaderSize || count > kMaxDNSMessageSize) {
-return kUnknown;
-}
-
-const uint8_t* ubuf = (const uint8_t*)buf;
-
-uint16_t flags = (ubuf[2] << 8) + ubuf[3];
-uint16_t num_questions = (ubuf[4] << 8) + ubuf[5];
-uint16_t num_answers = (ubuf[6] << 8) + ubuf[7];
-uint16_t num_auth = (ubuf[8] << 8) + ubuf[9];
-uint16_t num_addl = (ubuf[10] << 8) + ubuf[11];
-
-bool qr = (flags >> 15) & 0x1;
-uint8_t opcode = (flags >> 11) & 0xf;
-uint8_t zero = (flags >> 6) & 0x1;
-
-if (zero != 0) {
-return kUnknown;
-}
-
-if (opcode != 0) {
-return kUnknown;
-}
-
-if (num_questions == 0 || num_questions > 10) {
-return kUnknown;
-}
-
-uint32_t num_rr = num_questions + num_answers + num_auth + num_addl;
-if (num_rr > kMaxNumRR) {
-return kUnknown;
-}
-
-return (qr == 0) ? kRequest : kResponse;
-}
-
-// Redis request and response messages share the same format.
-// See https://redis.io/topics/protocol for the REDIS protocol spec.
-//
-// TODO(yzhao): Apply simplified parsing to read the content to distinguished request & response.
-static __inline bool is_redis_message(const char* buf, size_t count) {
-// Redis messages start with an one-byte type marker, and end with \r\n terminal sequence.
-if (count < 3) {
-return false;
-}
-
-const char first_byte = buf[0];
-
-if ( // Simple strings start with +
- first_byte != '+' &&
- // Errors start with -
- first_byte != '-' &&
- // Integers start with :
- first_byte != ':' &&
- // Bulk strings start with $
- first_byte != '$' &&
- // Arrays start with *
- first_byte != '*') {
-return false;
-}
-
-// The last two chars are \r\n, the terminal sequence of all Redis messages.
-if (buf[count - 2] != '\r') {
-return false;
-}
-if (buf[count - 1] != '\n') {
-return false;
-}
-
-return true;
-}
-
-// TODO(ddelnano): Mux protocol traffic is currently misidentified as ssh. Since
-// stirling doesn't have ssh support yet, but will need to be addressed. In addition,
-// mux seems to send the header and body on its protocol in two separate syscalls on
-// the server side.
-static __inline enum message_type_t infer_mux_message(const char* buf, size_t count) {
-// mux's on the wire format causes false positives for protocol inference
-// In order to address this, we only infer mux messages by the
-// most useful message types and if they are easy to identify
-static const int8_t kTdispatch = 2;
-static const int8_t kRdispatch = -2;
-static const int8_t kTinit = 68;
-static const int8_t kRinit = -68;
-static const int8_t kRerr = -128;
-static const int8_t kRerrOld = 127;
-uint32_t mux_header_size = 8;
-// TODO(ddelnano): Determine why mux-framer text in T/Rinit is
-// 6 bytes after the mux header
-int32_t mux_framer_pos = mux_header_size + 6;
-
-if (count < mux_header_size) {
-return kUnknown;
-}
-
-uint32_t length = read_big_endian_int32(buf) + 4;
-enum message_type_t msg_type;
-
-int32_t type_and_tag = read_big_endian_int32(buf + 4);
-int8_t mux_type = (type_and_tag & 0xff000000) >> 24;
-uint32_t tag = (type_and_tag & 0xffffff);
-switch (mux_type) {
-case kTdispatch:
-case kTinit:
-case kRerrOld:
- msg_type = kRequest;
- break;
-case kRdispatch:
-case kRinit:
-case kRerr:
- msg_type = kResponse;
- break;
-default:
- return kUnknown;
-}
-if (mux_type == kRerr || mux_type == kRerrOld) {
-if (length > MAX_SOCKET_BUFFER_READ_LENGTH) {
- length = MAX_SOCKET_BUFFER_READ_LENGTH;
-}
-if (buf[length - 5] != 'c' || buf[length - 4] != 'h' || buf[length - 3] != 'e' ||
- buf[length - 2] != 'c' || buf[length - 1] != 'k')
- return kUnknown;
-}
-
-if (mux_type == kRinit || mux_type == kTinit) {
-if (buf[mux_framer_pos] != 'm' || buf[mux_framer_pos + 1] != 'u' ||
- buf[mux_framer_pos + 2] != 'x' || buf[mux_framer_pos + 3] != '-' ||
- buf[mux_framer_pos + 4] != 'f' || buf[mux_framer_pos + 5] != 'r' ||
- buf[mux_framer_pos + 6] != 'a' || buf[mux_framer_pos + 7] != 'm' ||
- buf[mux_framer_pos + 8] != 'e' || buf[mux_framer_pos + 9] != 'r')
- return kUnknown;
-}
-
-if (tag < 1 || tag > ((1 << 23) - 1)) {
-return kUnknown;
-}
-
-return msg_type;
-}
-
-// NATS messages are in texts. The role is inferred from the message type.
-// See https://github.com/nats-io/docs/blob/master/nats_protocol/nats-protocol.md
-//
-// In case of bpf instruction count limit becomes a problem, we can drop CONNECT and INFO message
-// detection, they are only sent once after establishing the connection.
-static __inline enum message_type_t infer_nats_message(const char* buf, size_t count) {
-// NATS messages start with an one-byte type marker, and end with \r\n terminal sequence.
-if (count < 3) {
-return kUnknown;
-}
-if (count > MAX_SOCKET_BUFFER_READ_LENGTH) {
- count = MAX_SOCKET_BUFFER_READ_LENGTH;
-}
-// The last two chars are \r\n, the terminal sequence of all NATS messages.
-if (buf[count - 2] != '\r') {
-return kUnknown;
-}
-if (buf[count - 1] != '\n') {
-return kUnknown;
-}
-if (buf[0] == 'C' && buf[1] == 'O' && buf[2] == 'N' && buf[3] == 'N' && buf[4] == 'E' &&
- buf[5] == 'C' && buf[6] == 'T') {
-// kRequest is not precise. Here only means the message is sent by client.
-return kRequest;
-}
-if (buf[0] == 'S' && buf[1] == 'U' && buf[2] == 'B') {
-return kRequest;
-}
-if (buf[0] == 'U' && buf[1] == 'N' && buf[2] == 'S' && buf[3] == 'U' && buf[4] == 'B') {
-return kRequest;
-}
-if (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'B') {
-return kRequest;
-}
-if (buf[0] == 'I' && buf[1] == 'N' && buf[2] == 'F' && buf[3] == 'O') {
-// kResponse is not precise. Here only means the message is sent by server.
-return kResponse;
-}
-if (buf[0] == 'M' && buf[1] == 'S' && buf[2] == 'G') {
-return kResponse;
-}
-if (buf[0] == '+' && buf[1] == 'O' && buf[2] == 'K') {
-return kResponse;
-}
-if (buf[0] == '-' && buf[1] == 'E' && buf[2] == 'R' && buf[3] == 'R') {
-return kResponse;
-}
-// PING & PONG can be sent by both client and server. Don't use them.
-return kUnknown;
-}
-
-static __inline enum message_type_t analyze_protocol(char *buf, __u32 count, struct active_connection_t *conn_info) {
- struct protocol_message_t inferred_message;
- inferred_message.protocol = kProtocolUnknown;
- inferred_message.type = kUnknown;
-
- // The prepend_length_header controls whether a length header is prepended to the buffer
- // in user space.
- conn_info->prepend_length_header = false;
-
- // PROTOCOL_LIST: Requires update on new protocols.
- if ((inferred_message.type = infer_http_message(buf, count)) != kUnknown) {
- inferred_message.protocol = kProtocolHTTP;
- } else if ((inferred_message.type = infer_http2_message(buf, count)) != kUnknown) {
- inferred_message.protocol = kProtocolHTTP2;
- } else if ((inferred_message.type = infer_cql_message(buf, count)) != kUnknown) {
- inferred_message.protocol = kProtocolCQL;
- } else if ((inferred_message.type = infer_mongo_message(buf, count)) != kUnknown) {
- inferred_message.protocol = kProtocolMongo;
-// } else if ((inferred_message.type = infer_pgsql_message(buf, count)) != kUnknown) {
-// inferred_message.protocol = kProtocolPGSQL;
- } else if ((inferred_message.type = infer_mysql_message(buf, count, conn_info)) != kUnknown) {
- inferred_message.protocol = kProtocolMySQL;
-// } else if ((inferred_message.type = infer_mux_message(buf, count)) != kUnknown) {
-// inferred_message.protocol = kProtocolMux;
- } else if ((inferred_message.type = infer_kafka_message(buf, count, conn_info)) != kUnknown) {
- inferred_message.protocol = kProtocolKafka;
- } else if ((inferred_message.type = infer_dns_message(buf, count)) != kUnknown) {
- inferred_message.protocol = kProtocolDNS;
-// } else if (is_redis_message(buf, count)) {
-// // For Redis, the message type is left to be kUnknown.
-// // The message types are then inferred via traffic direction and client/server role.
-// inferred_message.protocol = kProtocolRedis;
-// } else if ((inferred_message.type = infer_nats_message(buf, count)) != kUnknown) {
-// inferred_message.protocol = kProtocolNATS;
- }
-
- conn_info->prev_count = count;
- if (count == 4) {
- conn_info->prev_buf[0] = buf[0];
- conn_info->prev_buf[1] = buf[1];
- conn_info->prev_buf[2] = buf[2];
- conn_info->prev_buf[3] = buf[3];
- }
-
- if (inferred_message.protocol != kProtocolUnknown) {
- conn_info->protocol = inferred_message.protocol;
- }
-
- return inferred_message.type;
-}
\ No newline at end of file
diff --git a/bpf/profiling/network/protocol_analyzer.h b/bpf/profiling/network/protocol_analyzer.h
new file mode 100644
index 0000000..e6a7838
--- /dev/null
+++ b/bpf/profiling/network/protocol_analyzer.h
@@ -0,0 +1,168 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common.h"
+
+#pragma once
+
+#define CONNECTION_PROTOCOL_UNKNOWN 0
+#define CONNECTION_PROTOCOL_HTTP1 1
+#define CONNECTION_PROTOCOL_HTTP2 2
+
+#define CONNECTION_MESSAGE_TYPE_UNKNOWN 0
+#define CONNECTION_MESSAGE_TYPE_REQUEST 1
+#define CONNECTION_MESSAGE_TYPE_RESPONSE 2
+
+// HTTP 1.x
+// request frame format: https://www.rfc-editor.org/rfc/rfc2068.html#section-5
+// response frame format: https://www.rfc-editor.org/rfc/rfc2068.html#section-6
+static __inline __u32 infer_http1_message(const char* buf, size_t count) {
+ if (count < 16) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
+ // response
+ if (buf[0] == 'H' && buf[1] == 'T' && buf[2] == 'T' && buf[3] == 'P') {
+ return CONNECTION_MESSAGE_TYPE_RESPONSE;
+ }
+ // request
+ if (buf[0] == 'G' && buf[1] == 'E' && buf[2] == 'T') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'P' && buf[1] == 'O' && buf[2] == 'S' && buf[3] == 'T') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'O' && buf[1] == 'P' && buf[2] == 'T' && buf[3] == 'I' && buf[4] == 'O' && buf[5] == 'N') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'H' && buf[1] == 'E' && buf[2] == 'A' && buf[3] == 'D') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'P' && buf[1] == 'U' && buf[2] == 'T') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'D' && buf[1] == 'E' && buf[2] == 'L' && buf[3] == 'E' && buf[4] == 'T' && buf[5] == 'E') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'C' && buf[1] == 'O' && buf[2] == 'N' && buf[3] == 'N' && buf[4] == 'E' && buf[5] == 'T') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'T' && buf[1] == 'R' && buf[2] == 'A' && buf[3] == 'C' && buf[4] == 'E') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ if (buf[0] == 'P' && buf[1] == 'A' && buf[2] == 'T' && buf[3] == 'C' && buf[4] == 'H') {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ }
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+}
+
+
+// HTTP 2.x
+// frame format: https://www.rfc-editor.org/rfc/rfc7540.html#section-4.1
+static __inline __u32 infer_http2_message(const char* buf, size_t count) {
+ static const uint8_t kFrameBasicSize = 0x9; // including Length, Type, Flags, Reserved, Stream Identity
+ static const uint8_t kFrameTypeHeader = 0x1; // the type of the frame: https://www.rfc-editor.org/rfc/rfc7540.html#section-6.2
+ static const uint8_t kFrameLoopCount = 5;
+
+ static const uint8_t kStaticTableMaxSize = 61;// https://www.rfc-editor.org/rfc/rfc7541#appendix-A
+ static const uint8_t kStaticTableAuth = 1;
+ static const uint8_t kStaticTableGet = 2;
+ static const uint8_t kStaticTablePost = 3;
+ static const uint8_t kStaticTablePath1 = 4;
+ static const uint8_t kStaticTablePath2 = 5;
+
+ // the buffer size must bigger than basic frame size
+ if (count < kFrameBasicSize) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
+
+ // frame info
+ __u8 frame[21] = { 0 };
+ __u32 frameOffset = 0;
+ // header info
+ __u8 staticInx, headerBlockFragmentOffset;
+
+ // each all frame
+#pragma unroll
+ for (__u8 i = 0; i < kFrameLoopCount; i++) {
+ if (frameOffset >= count) {
+ break;
+ }
+
+ // read frame
+ bpf_probe_read(frame, sizeof(frame), buf + frameOffset);
+ frameOffset += (bpf_ntohl(*(__u32 *) frame) >> 8) + kFrameBasicSize;
+
+ // is header frame
+ if (frame[3] != kFrameTypeHeader) {
+ continue;
+ }
+
+ // validate the header(unset): not HTTP2 protocol
+ // this frame must is a send request
+ if ((frame[4] & 0xd2) || frame[5] & 0x01) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
+
+ // locate the header block fragment offset
+ headerBlockFragmentOffset = kFrameBasicSize;
+ if (frame[4] & 0x20) { // PADDED flag is set
+ headerBlockFragmentOffset += 1;
+ }
+ if (frame[4] & 0x20) { // PRIORITY flag is set
+ headerBlockFragmentOffset += 5;
+ }
+
+#pragma unroll
+ for (__u8 j = 0; j <= kStaticTablePath2; j++) {
+ if (headerBlockFragmentOffset > count) {
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+ }
+ staticInx = frame[headerBlockFragmentOffset] & 0x7f;
+ if (staticInx <= kStaticTableMaxSize && staticInx > 0) {
+ if (staticInx == kStaticTableAuth ||
+ staticInx == kStaticTableGet ||
+ staticInx == kStaticTablePost ||
+ staticInx == kStaticTablePath1 ||
+ staticInx == kStaticTablePath2) {
+ return CONNECTION_MESSAGE_TYPE_REQUEST;
+ } else {
+ return CONNECTION_MESSAGE_TYPE_RESPONSE;
+ }
+ }
+ headerBlockFragmentOffset++;
+ }
+ }
+
+ return CONNECTION_MESSAGE_TYPE_UNKNOWN;
+}
+
+static __inline __u32 analyze_protocol(char *buf, __u32 count, struct active_connection_t *conn_info) {
+ __u32 protocol = CONNECTION_PROTOCOL_UNKNOWN, type = CONNECTION_MESSAGE_TYPE_UNKNOWN;
+
+ // support http 1.x and 2.x
+ if ((type = infer_http1_message(buf, count)) != CONNECTION_PROTOCOL_UNKNOWN) {
+ protocol = CONNECTION_PROTOCOL_HTTP1;
+ } else if ((type = infer_http2_message(buf, count)) != CONNECTION_PROTOCOL_UNKNOWN) {
+ protocol = CONNECTION_PROTOCOL_HTTP2;
+ }
+
+ if (protocol != CONNECTION_PROTOCOL_UNKNOWN) {
+ conn_info->protocol = protocol;
+ }
+
+ return type;
+}
\ No newline at end of file
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index f3fde47..36cdf66 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -56,10 +56,6 @@ struct active_connection_t {
// for protocol analyze
__u32 protocol;
- __u32 fix;
- __u64 prev_count;
- char prev_buf[4];
- __u32 prepend_length_header;
// current connection is ssl
__u32 ssl;
diff --git a/pkg/profiling/task/network/context.go b/pkg/profiling/task/network/context.go
index 42e7364..3ebc2eb 100644
--- a/pkg/profiling/task/network/context.go
+++ b/pkg/profiling/task/network/context.go
@@ -281,12 +281,8 @@ type ActiveConnectionInBPF struct {
WriteRTTExeTime uint64
// Protocol analyze context
- Protocol ConnectionProtocol
- Fix uint32
- ProtocolPrevCount uint64
- ProtocolPrevBuf [4]byte
- ProtocolPrependHeader uint32
- IsSSL uint32
+ Protocol ConnectionProtocol
+ IsSSL uint32
// the connect event is already sent
ConnectEventIsSent uint32
diff --git a/pkg/profiling/task/network/enums.go b/pkg/profiling/task/network/enums.go
index 5cbbe5f..db43f06 100644
--- a/pkg/profiling/task/network/enums.go
+++ b/pkg/profiling/task/network/enums.go
@@ -83,15 +83,6 @@ const (
ConnectionProtocolUnknown ConnectionProtocol = 0
ConnectionProtocolHTTP ConnectionProtocol = 1
ConnectionProtocolHTTP2 ConnectionProtocol = 2
- ConnectionProtocolMySQL ConnectionProtocol = 3
- ConnectionProtocolCQL ConnectionProtocol = 4
- ConnectionProtocolPGSQL ConnectionProtocol = 5
- ConnectionProtocolDNS ConnectionProtocol = 6
- ConnectionProtocolRedis ConnectionProtocol = 7
- ConnectionProtocolNATS ConnectionProtocol = 8
- ConnectionProtocolMongo ConnectionProtocol = 9
- ConnectionProtocolKafka ConnectionProtocol = 10
- ConnectionProtocolMux ConnectionProtocol = 11
)
func (c ConnectionProtocol) String() string {
@@ -102,24 +93,6 @@ func (c ConnectionProtocol) String() string {
return http
case ConnectionProtocolHTTP2:
return http
- case ConnectionProtocolMySQL:
- return "mysql"
- case ConnectionProtocolCQL:
- return "cql"
- case ConnectionProtocolPGSQL:
- return "pgsql"
- case ConnectionProtocolDNS:
- return "dns"
- case ConnectionProtocolRedis:
- return "redis"
- case ConnectionProtocolNATS:
- return "nats"
- case ConnectionProtocolMongo:
- return "mongo"
- case ConnectionProtocolKafka:
- return "kafka"
- case ConnectionProtocolMux:
- return "mutex"
default:
return unknown
}