You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/01/05 08:50:52 UTC
[skywalking-rover] branch main updated: Enhance the protocol reader for support long socket data (#69)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new 8550199 Enhance the protocol reader for support long socket data (#69)
8550199 is described below
commit 8550199e98c9f5a4b2058878a0a899ffb73fe461
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Jan 5 16:50:47 2023 +0800
Enhance the protocol reader for support long socket data (#69)
---
CHANGES.md | 13 +
bpf/profiling/network/netmonitor.c | 28 +-
bpf/profiling/network/sock_stats.h | 2 +-
.../task/network/analyze/layer7/events.go | 4 +
.../task/network/analyze/layer7/listener.go | 4 +-
.../analyze/layer7/protocols/base/analyzer.go | 229 ++++++++++++
.../analyze/layer7/protocols/base/buffer.go | 414 +++++++++++++++++++++
.../analyze/layer7/protocols/base/buffer_test.go | 94 +++++
.../analyze/layer7/protocols/base/events.go | 118 +++---
.../analyze/layer7/protocols/base/protocol.go | 16 +-
.../analyze/layer7/protocols/http1/analyzer.go | 318 +++++-----------
.../layer7/protocols/http1/analyzer_test.go | 310 ---------------
.../analyze/layer7/protocols/http1/builder.go | 290 ---------------
.../analyze/layer7/protocols/http1/metrics.go | 174 ++-------
.../layer7/protocols/http1/reader/reader.go | 298 +++++++++++++++
.../layer7/protocols/http1/reader/request.go | 143 +++++++
.../layer7/protocols/http1/reader/response.go | 125 +++++++
.../analyze/layer7/protocols/http1/sampling.go | 29 +-
.../network/analyze/layer7/protocols/protocols.go | 37 +-
pkg/profiling/task/network/analyze/layer7/queue.go | 6 +-
pkg/tools/host/time.go | 8 +-
21 files changed, 1562 insertions(+), 1098 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index b80c9f5..b6155ec 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,19 @@ Changes by Version
==================
Release Notes.
+0.5.0
+------------------
+#### Features
+* Enhance the protocol reader for support long socket data.
+
+#### Bug Fixes
+
+#### Documentation
+
+#### Issues and PR
+- All issues are [here](https://github.com/apache/skywalking/milestone/167?closed=1)
+- All and pull requests are [here](https://github.com/apache/skywalking-rover/milestone/5?closed=1)
+
0.4.0
------------------
#### Features
diff --git a/bpf/profiling/network/netmonitor.c b/bpf/profiling/network/netmonitor.c
index 05e8803..43577ba 100644
--- a/bpf/profiling/network/netmonitor.c
+++ b/bpf/profiling/network/netmonitor.c
@@ -46,7 +46,7 @@ char __license[] SEC("license") = "Dual MIT/GPL";
val; \
})
-#define SOCKET_UPLOAD_CHUNK_LIMIT 8
+#define SOCKET_UPLOAD_CHUNK_LIMIT 12
static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
@@ -272,10 +272,11 @@ static __always_inline void resent_connect_event(struct pt_regs *ctx, __u32 tgid
}
}
-static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, struct socket_data_upload_event *event) {
+static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct socket_data_upload_event *event) {
event->sequence = index;
event->data_len = size;
event->finished = is_finished;
+ event->have_reduce_after_chunk = have_reduce_after_chunk;
if (size <= 0) {
return;
}
@@ -292,8 +293,10 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t
// calculate bytes need to send
ssize_t remaining = size - already_send;
size_t need_send_in_chunk = 0;
+ __u8 have_reduce_after_chunk = 0;
if (remaining > MAX_TRANSMIT_SOCKET_READ_LENGTH) {
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH;
+ have_reduce_after_chunk = 1;
} else {
need_send_in_chunk = remaining;
}
@@ -304,7 +307,7 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t
is_finished = 0;
sequence = generate_socket_sequence(event->conid, event->data_id);
}
- __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, event);
+ __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, event);
already_send += need_send_in_chunk;
}
@@ -316,19 +319,22 @@ if (iov_index < iovlen) { \
BPF_PROBE_READ_VAR(cur_iov, &iov[iov_index]); \
ssize_t remaining = size - already_send; \
size_t need_send_in_chunk = remaining - cur_iov_sended; \
+ __u8 have_reduce_after_chunk = 0; \
if (cur_iov_sended + need_send_in_chunk > cur_iov.iov_len) { \
need_send_in_chunk = cur_iov.iov_len - cur_iov_sended; \
if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
+ have_reduce_after_chunk = 1; \
} else { \
iov_index++; \
cur_iov_sended = 0; \
} \
} else if (need_send_in_chunk > MAX_TRANSMIT_SOCKET_READ_LENGTH) { \
need_send_in_chunk = MAX_TRANSMIT_SOCKET_READ_LENGTH; \
+ have_reduce_after_chunk = 1; \
} \
- __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
- __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, event); \
+ __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \
+ __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); \
already_send += need_send_in_chunk; \
loop_count++; \
}
@@ -365,23 +371,11 @@ static __inline void upload_socket_data(void *ctx, __u64 start_time, __u64 end_t
if (connection->ssl != ssl) {
return;
}
- // if the msg type is unknown, then try to re-analysis
- if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN) {
- struct socket_buffer_reader_t *buf_reader = read_socket_data(args, bytes_count);
- if (buf_reader == NULL) {
- return;
- }
- existing_msg_type = analyze_protocol(buf_reader->buffer, buf_reader->data_len, connection);
- if (existing_msg_type == CONNECTION_MESSAGE_TYPE_UNKNOWN && args->ssl_buffer_force_unfinished == 0) {
- return;
- }
- }
// basic data
event->start_time = start_time;
event->end_time = end_time;
event->protocol = connection->protocol;
- event->msg_type = existing_msg_type;
event->direction = data_direction;
event->conid = conid;
event->randomid = connection->random_id;
diff --git a/bpf/profiling/network/sock_stats.h b/bpf/profiling/network/sock_stats.h
index 84a46eb..8312af3 100644
--- a/bpf/profiling/network/sock_stats.h
+++ b/bpf/profiling/network/sock_stats.h
@@ -258,7 +258,7 @@ struct {
struct socket_data_upload_event {
__u8 protocol;
- __u8 msg_type;
+ __u8 have_reduce_after_chunk;
__u8 direction;
__u8 finished;
__u16 sequence;
diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go
index 9096913..5590dce 100644
--- a/pkg/profiling/task/network/analyze/layer7/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/events.go
@@ -61,6 +61,10 @@ func NewSocketDataPartitionContext(l base.Context, config *profiling.TaskConfig)
}
}
+func (p *SocketDataPartitionContext) Start(ctx context.Context) {
+ p.analyzer.Start(ctx)
+}
+
func (p *SocketDataPartitionContext) Consume(data interface{}) {
event := data.(*base.SocketDataUploadEvent)
p.analyzer.ReceiveSocketDataEvent(event)
diff --git a/pkg/profiling/task/network/analyze/layer7/listener.go b/pkg/profiling/task/network/analyze/layer7/listener.go
index 8edb532..61f32bd 100644
--- a/pkg/profiling/task/network/analyze/layer7/listener.go
+++ b/pkg/profiling/task/network/analyze/layer7/listener.go
@@ -132,9 +132,9 @@ func (l *Listener) QueryConnection(conID, randomID uint64) *base.ConnectionConte
return nil
}
-func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) protocol.Metrics {
+func (l *Listener) QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, p base.ConnectionProtocol) protocol.Metrics {
metrics := conMetrics.GetMetrics(ListenerName).(*protocols.ProtocolMetrics)
- return metrics.GetProtocolMetrics(protocolName)
+ return metrics.GetProtocolMetrics(p)
}
func (l *Listener) generateCachedConnectionKey(conID, randomID uint64) string {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
new file mode 100644
index 0000000..0abc173
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go
@@ -0,0 +1,229 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+ profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+)
+
+const (
+ batchReadMinCount = 1000
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "base")
+
+// ProtocolAnalyzer handler all socket data for each protocol
+type ProtocolAnalyzer struct {
+ protocolContext Context
+ protocol Protocol
+ config *profiling.TaskConfig
+
+ connections map[connectionKey]*connectionInfo
+ analyzeLocker sync.Mutex
+ receiveEventCount int
+}
+
+func NewProtocolAnalyzer(protocolContext Context, p Protocol, config *profiling.TaskConfig) *ProtocolAnalyzer {
+ return &ProtocolAnalyzer{
+ protocolContext: protocolContext,
+ protocol: p,
+ config: config,
+ connections: make(map[connectionKey]*connectionInfo),
+ }
+}
+
+func (a *ProtocolAnalyzer) Start(ctx context.Context) {
+ duration, _ := time.ParseDuration(a.config.Network.ReportInterval)
+ timeTicker := time.NewTicker(duration)
+ go func() {
+ for {
+ select {
+ case <-timeTicker.C:
+ // process event with interval
+ a.processEvents()
+ case <-ctx.Done():
+ timeTicker.Stop()
+ return
+ }
+ }
+ }()
+
+ // if the protocol defined the events expire time, then check events interval
+ expireDuration := a.protocol.PackageMaxExpireDuration()
+ if expireDuration.Milliseconds() > 0 {
+ expireTicker := time.NewTicker(expireDuration)
+ go func() {
+ for {
+ select {
+ case <-expireTicker.C:
+ a.processExpireEvents(expireDuration)
+ case <-ctx.Done():
+ expireTicker.Stop()
+ return
+ }
+ }
+ }()
+ }
+}
+
+func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event *SocketDataUploadEvent) {
+ connectionID := event.GenerateConnectionID()
+ key := connectionKey{connectionID: event.ConnectionID, randomID: event.RandomID}
+ connection := a.connections[key]
+ if connection == nil {
+ connection = newConnectionInfo(a.protocol, ctx, key.connectionID, key.randomID)
+ a.connections[key] = connection
+ }
+ connection.checkConnectionMetrics(ctx)
+
+ log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, have reduce after chunk: %t, direction: %s, size: %d, total size: %d",
+ connectionID, event.DataID(), event.DataSequence(), event.Finished, event.HaveReduceDataAfterChunk(),
+ event.Direction().String(), event.DataLen, event.TotalSize0)
+
+ // insert to the event list
+ connection.buffer.appendEvent(event)
+
+ // process the events if reach the receiver counter
+ a.receiveEventCount++
+ if a.receiveEventCount >= batchReadMinCount {
+ a.processEvents()
+ }
+ a.receiveEventCount = 0
+}
+
+// processEvents means analyze the protocol in each connection
+func (a *ProtocolAnalyzer) processEvents() {
+ // it could be triggered by interval or reach counter
+ // if any trigger bean locked, the other one just ignore process
+ if !a.analyzeLocker.TryLock() {
+ return
+ }
+ defer a.analyzeLocker.Unlock()
+
+ for _, connection := range a.connections {
+ a.processConnectionEvents(connection)
+ }
+}
+
+// processExpireEvents delete the expired events
+func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
+ // the expiry must be mutual exclusion with events processor
+ a.analyzeLocker.Lock()
+ defer a.analyzeLocker.Unlock()
+
+ for _, connection := range a.connections {
+ a.processConnectionExpireEvents(connection, expireDuration)
+ }
+}
+
+func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
+ // reset the status for prepare reading
+ buffer := connection.buffer
+ metrics := connection.metrics
+ connectionID := connection.connectionID
+ buffer.resetForLoopReading()
+ // loop to read the protocol data
+ for {
+ // reset the status of reading
+ if !buffer.prepareForReading() {
+ log.Debugf("prepare finsihed: event size: %d", buffer.events.Len())
+ return
+ }
+
+ result := a.protocol.ParseProtocol(connectionID, metrics, buffer)
+ finishReading := false
+ switch result {
+ case ParseResultSuccess:
+ finishReading = buffer.removeReadElements()
+ case ParseResultSkipPackage:
+ finishReading = buffer.skipCurrentElement()
+ }
+
+ if finishReading {
+ log.Debugf("reading finsihed: event size: %d", buffer.events.Len())
+ break
+ }
+ }
+}
+
+func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection *connectionInfo, expireDuration time.Duration) {
+ if c := connection.buffer.deleteExpireEvents(expireDuration); c > 0 {
+ log.Debugf("total removed %d expired events for %s protocol", c, a.protocol.Protocol().String())
+ }
+}
+
+func (a *ProtocolAnalyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+ a.protocol.UpdateExtensionConfig(config)
+}
+
+type connectionKey struct {
+ connectionID uint64
+ randomID uint64
+}
+
+type connectionInfo struct {
+ connectionID, randomID uint64
+ connectionProtocol base.ConnectionProtocol
+ buffer *Buffer
+ metrics Metrics
+ metricsFromConnection bool
+}
+
+func newConnectionInfo(p Protocol, connectionContext Context, connectionID, randomID uint64) *connectionInfo {
+ fromConnection := false
+ var connectionMetrics Metrics
+ con := connectionContext.QueryConnection(connectionID, randomID)
+ // if connection not exists, then cached it into the analyzer context
+ if con == nil {
+ connectionMetrics = p.GenerateMetrics()
+ } else {
+ connectionMetrics = connectionContext.QueryProtocolMetrics(con.Metrics, p.Protocol())
+ fromConnection = true
+ }
+
+ return &connectionInfo{
+ connectionID: connectionID,
+ randomID: randomID,
+ connectionProtocol: p.Protocol(),
+ buffer: newBuffer(),
+ metrics: connectionMetrics,
+ metricsFromConnection: fromConnection,
+ }
+}
+
+func (c *connectionInfo) checkConnectionMetrics(protocolContext Context) {
+ if c.metricsFromConnection {
+ return
+ }
+ connection := protocolContext.QueryConnection(c.connectionID, c.randomID)
+ if connection == nil {
+ return
+ }
+
+ // merge the temporary metrics into the connection metrics
+ connectionMetrics := protocolContext.QueryProtocolMetrics(connection.Metrics, c.connectionProtocol)
+ connectionMetrics.MergeMetricsFromConnection(connection, c.metrics)
+ c.metrics = connectionMetrics
+ c.metricsFromConnection = true
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
new file mode 100644
index 0000000..c7b2647
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer.go
@@ -0,0 +1,414 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "container/list"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-rover/pkg/tools/host"
+)
+
+var (
+ ErrNotComplete = errors.New("socket: not complete event")
+)
+
+type Buffer struct {
+ events *list.List
+ validated bool // the events list is validated or not
+
+ eventLocker sync.RWMutex
+
+ head *BufferPosition
+ current *BufferPosition
+}
+
+type BufferPosition struct {
+ // element of the event list
+ element *list.Element
+ // bufIndex the buffer index of the element
+ bufIndex int
+}
+
+func (p *BufferPosition) String() string {
+ buffer := p.element.Value.(SocketDataBuffer)
+ return fmt.Sprintf("data id: %d, sequence: %d, buffer index: %d",
+ buffer.DataID(), buffer.DataSequence(), p.bufIndex)
+}
+
+func newBuffer() *Buffer {
+ return &Buffer{
+ events: list.New(),
+ validated: false,
+ }
+}
+
+func (r *Buffer) Position() *BufferPosition {
+ return r.current.Clone()
+}
+
+func (r *Buffer) Slice(validated bool, start, end *BufferPosition) *Buffer {
+ events := list.New()
+ for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() {
+ events.PushBack(nextElement.Value)
+ }
+ events.PushBack(&SocketDataEventLimited{end.element.Value.(SocketDataBuffer), 0, end.bufIndex})
+
+ return &Buffer{
+ events: events,
+ validated: validated,
+ head: &BufferPosition{element: events.Front(), bufIndex: start.bufIndex},
+ current: &BufferPosition{element: events.Front(), bufIndex: start.bufIndex},
+ }
+}
+
+func (r *Buffer) Len() int {
+ if r.head == nil {
+ return 0
+ }
+ var result int
+ var startIndex = r.head.bufIndex
+ for e := r.head.element; e != nil; e = e.Next() {
+ result += r.head.element.Value.(SocketDataBuffer).BufferLen() - startIndex
+ startIndex = 0
+ }
+ return result
+}
+
+func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
+ if r.events.Len() == 0 {
+ return nil
+ }
+ return r.events.Front().Value.(SocketDataBuffer)
+}
+
+func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
+ if r.events.Len() == 0 {
+ return nil
+ }
+ return r.events.Back().Value.(SocketDataBuffer)
+}
+
+// DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count
+func (r *Buffer) DetectNotSendingLastPosition() *BufferPosition {
+ if r.events.Len() == 0 {
+ return nil
+ }
+
+ for e := r.events.Front(); e != nil; e = e.Next() {
+ buf := e.Value.(SocketDataBuffer)
+ // the buffer is sent finished but still have reduced data not send
+ if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
+ return &BufferPosition{element: e, bufIndex: buf.BufferLen()}
+ }
+ }
+ return nil
+}
+
+func CombineSlices(validated bool, buffers ...*Buffer) *Buffer {
+ if len(buffers) == 0 {
+ return nil
+ }
+ if len(buffers) == 1 {
+ return buffers[0]
+ }
+ events := list.New()
+ for _, b := range buffers {
+ if b.head.bufIndex > 0 {
+ headBuffer := b.events.Front().Value.(SocketDataBuffer)
+ events.PushBack(&SocketDataEventLimited{headBuffer, b.head.bufIndex, headBuffer.BufferLen()})
+ for next := b.events.Front().Next(); next != nil; next = next.Next() {
+ events.PushBack(next.Value)
+ }
+ } else {
+ events.PushBackList(b.events)
+ }
+ }
+ return &Buffer{
+ events: events,
+ validated: validated,
+ head: &BufferPosition{element: events.Front(), bufIndex: 0},
+ current: &BufferPosition{element: events.Front(), bufIndex: 0},
+ }
+}
+
+func (r *Buffer) Peek(p []byte) (n int, err error) {
+ // save the index temporary
+ tmpPosition := r.current.Clone()
+ // restore the index
+ defer func() {
+ r.current = tmpPosition
+ }()
+ readIndex := 0
+ for readIndex < len(p) {
+ count, err := r.Read(p[readIndex:])
+ if err != nil {
+ return 0, err
+ }
+ readIndex += count
+ }
+ return readIndex, nil
+}
+
+func (r *Buffer) OffsetPosition(offset int) *BufferPosition {
+ var nextElement func(e *list.Element) *list.Element
+ if offset == 0 {
+ return r.current.Clone()
+ } else if offset > 0 {
+ nextElement = func(e *list.Element) *list.Element {
+ return e.Next()
+ }
+ } else {
+ nextElement = func(e *list.Element) *list.Element {
+ return e.Prev()
+ }
+ }
+
+ var curEle = r.current.element
+ var curIndex = r.current.bufIndex
+ for ; curEle != nil; curEle = nextElement(curEle) {
+ nextOffset := curIndex + offset
+ bufferLen := curEle.Value.(SocketDataBuffer).BufferLen()
+ if nextOffset >= 0 && nextOffset < bufferLen {
+ curIndex += offset
+ break
+ }
+
+ if offset > 0 {
+ offset -= bufferLen - curIndex
+ curIndex = 0
+ } else {
+ offset += curIndex
+ next := nextElement(curEle)
+ if next == nil {
+ curEle = next
+ break
+ }
+ curIndex = curEle.Value.(SocketDataBuffer).BufferLen()
+ }
+ }
+
+ if curEle == nil {
+ return nil
+ }
+ return &BufferPosition{element: curEle, bufIndex: curIndex}
+}
+
+func (r *Buffer) Read(p []byte) (n int, err error) {
+ if len(p) == 0 {
+ return 0, nil
+ }
+ if r.current == nil || r.current.element == nil {
+ return 0, io.EOF
+ }
+ element, n := r.readFromCurrent(p)
+ if n > 0 {
+ return n, nil
+ }
+
+ curEvent := element.Value.(SocketDataBuffer)
+ next := r.nextElement(element)
+ if next == nil {
+ return 0, io.EOF
+ }
+ nextEvent := next.Value.(SocketDataBuffer)
+
+ var shouldRead = false
+ if r.validated {
+ shouldRead = true
+ // same data id and sequence orders
+ } else if (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence()+1 == nextEvent.DataSequence()) ||
+ // cur event is finished and next event is start
+ (nextEvent.IsStart() && curEvent.IsFinished()) ||
+ // same data id and sequence but have difference buffer index
+ (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence() == nextEvent.DataSequence() &&
+ r.current.bufIndex <= nextEvent.BufferStartPosition()) {
+ shouldRead = true
+ }
+
+ if !shouldRead {
+ return 0, ErrNotComplete
+ }
+
+ return r.read0(next, nextEvent, p)
+}
+
+func (r *Buffer) readFromCurrent(p []byte) (element *list.Element, n int) {
+ element = r.current.element
+ curEvent := element.Value.(SocketDataBuffer)
+ residueSize := curEvent.BufferLen() - r.current.bufIndex
+ if residueSize > 0 {
+ readLen := len(p)
+ if residueSize < readLen {
+ readLen = residueSize
+ }
+
+ n = copy(p, curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen])
+ r.current.bufIndex += n
+ return element, n
+ }
+ return element, 0
+}
+
+func (r *Buffer) read0(currentElement *list.Element, currentBuffer SocketDataBuffer, p []byte) (n int, err error) {
+ readLen := len(p)
+ if currentBuffer.BufferLen() < readLen {
+ readLen = currentBuffer.BufferLen()
+ }
+
+ copy(p, currentBuffer.BufferData()[:readLen])
+ r.current.element = currentElement
+ r.current.bufIndex = readLen
+ return readLen, nil
+}
+
+// IsCurrentPacketReadFinished means to validate the current reading package is reading finished
+func (r *Buffer) IsCurrentPacketReadFinished() bool {
+ return r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen()
+}
+
+func (r *Buffer) resetForLoopReading() {
+ r.head = nil
+ r.current = nil
+}
+
+func (r *Buffer) prepareForReading() bool {
+ if r.events.Len() == 0 {
+ return false
+ }
+ if r.head == nil || r.head.element == nil {
+ // read in the first element
+ r.eventLocker.RLock()
+ defer r.eventLocker.RUnlock()
+ r.head = &BufferPosition{element: r.events.Front(), bufIndex: 0}
+ r.current = r.head.Clone()
+ } else {
+ // make sure we can read from head
+ r.current = r.head.Clone()
+ }
+
+ return true
+}
+
+func (r *Buffer) removeReadElements() bool {
+ r.eventLocker.Lock()
+ defer r.eventLocker.Unlock()
+
+ // delete until to current position
+ next := r.head.element
+ for ; next != nil && next != r.current.element; next = r.removeElement0(next) {
+ }
+ if next != nil && next.Value.(SocketDataBuffer).BufferLen() == r.current.bufIndex {
+ // the last event already read finished, then delete it
+ r.head.element = r.removeElement0(next)
+ r.head.bufIndex = 0
+ } else if next != nil {
+ // keep using the latest element
+ r.head.element = next
+ } else {
+ return true
+ }
+ return false
+}
+
+// skipCurrentElement skip current element in reader, if return true means have read finished
+func (r *Buffer) skipCurrentElement() bool {
+ r.head.element = r.nextElement(r.current.element)
+ r.current.bufIndex = 0
+
+ return r.head.element == nil
+}
+
+func (r *Buffer) removeElement0(element *list.Element) *list.Element {
+ if element == nil {
+ return nil
+ }
+ result := element.Next()
+ r.events.Remove(element)
+ return result
+}
+
+// appendEvent insert the event to the event list following the order
+func (r *Buffer) appendEvent(event *SocketDataUploadEvent) {
+ r.eventLocker.Lock()
+ defer r.eventLocker.Unlock()
+
+ if r.events.Len() == 0 {
+ r.events.PushFront(event)
+ return
+ }
+ if r.events.Back().Value.(SocketDataBuffer).DataID() < event.DataID() {
+ r.events.PushBack(event)
+ return
+ }
+ beenAdded := false
+ for element := r.events.Front(); element != nil; element = element.Next() {
+ existEvent := element.Value.(SocketDataBuffer)
+ if existEvent.DataID() > event.DataID() {
+ // data id needs order
+ beenAdded = true
+ } else if existEvent.DataID() == event.DataID() && existEvent.DataSequence() > event.DataSequence() {
+ // following the sequence order
+ beenAdded = true
+ }
+ if beenAdded {
+ r.events.InsertBefore(event, element)
+ break
+ }
+ }
+ if !beenAdded {
+ r.events.PushBack(event)
+ }
+}
+
+func (r *Buffer) deleteExpireEvents(expireDuration time.Duration) int {
+ r.eventLocker.Lock()
+ defer r.eventLocker.Unlock()
+
+ expireTime := time.Now().Add(-expireDuration)
+ count := 0
+ for e := r.events.Front(); e != nil; {
+ startTime := host.Time(e.Value.(SocketDataBuffer).StartTime())
+ if expireTime.After(startTime) {
+ count++
+ cur := e
+ e = e.Next()
+ r.events.Remove(cur)
+ } else {
+ break
+ }
+ }
+ return count
+}
+
+func (r *Buffer) nextElement(e *list.Element) *list.Element {
+ if e == nil {
+ return nil
+ }
+ r.eventLocker.RLock()
+ defer r.eventLocker.RUnlock()
+ return e.Next()
+}
+
+func (p *BufferPosition) Clone() *BufferPosition {
+ return &BufferPosition{element: p.element, bufIndex: p.bufIndex}
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
new file mode 100644
index 0000000..87984b2
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/buffer_test.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "container/list"
+ "testing"
+)
+
+func TestOffsetPosition(t *testing.T) {
+ type position struct {
+ eventIndex int
+ bufferIndex int
+ }
+ var tests = []struct {
+ events []int
+ current position
+ offset int
+ result *position
+ }{
+ {
+ events: []int{10, 10, 10},
+ current: position{0, 0},
+ offset: 10,
+ result: &position{1, 0},
+ },
+ {
+ events: []int{10, 10, 10},
+ current: position{1, 0},
+ offset: -10,
+ result: &position{0, 0},
+ },
+ {
+ events: []int{10, 10, 10},
+ current: position{2, 5},
+ offset: -10,
+ result: &position{1, 5},
+ },
+ {
+ events: []int{10, 10, 10},
+ current: position{2, 5},
+ offset: -20,
+ result: &position{0, 5},
+ },
+ {
+ events: []int{10, 10, 10},
+ current: position{2, 5},
+ offset: 10,
+ result: nil,
+ },
+ }
+
+ for _, test := range tests {
+ events := list.New()
+ buffer := Buffer{events: events}
+ var curElement *list.Element
+ for i, e := range test.events {
+ element := events.PushBack(&SocketDataUploadEvent{
+ DataID0: uint64(i),
+ DataLen: uint16(e),
+ })
+ if i == test.current.eventIndex {
+ curElement = element
+ }
+ }
+
+ buffer.prepareForReading()
+ buffer.current = &BufferPosition{element: curElement, bufIndex: test.current.bufferIndex}
+ offsetPosition := buffer.OffsetPosition(test.offset)
+ if offsetPosition == nil && test.result == nil {
+ continue
+ }
+ if int(offsetPosition.element.Value.(*SocketDataUploadEvent).DataID()) != test.result.eventIndex ||
+ offsetPosition.bufIndex != test.result.bufferIndex {
+ t.Fatalf("excepted: %d,%d, actual: %d,%d", test.result.eventIndex, test.result.bufferIndex,
+ offsetPosition.element.Value.(*SocketDataUploadEvent).DataID(), offsetPosition.bufIndex)
+ }
+ }
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
index e3f09f0..52f886d 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/events.go
@@ -24,35 +24,45 @@ import (
)
type SocketDataBuffer interface {
- // Combine from other buffer
- Combine(buffered SocketDataBuffer) SocketDataBuffer
// BufferData of the buffer
BufferData() []byte
// TotalSize of socket data, the data may exceed the size of the BufferData()
TotalSize() uint64
// Direction of the data, send or receive
Direction() base.SocketDataDirection
- FirstEvent() *SocketDataUploadEvent
-
+ // BufferStartPosition the buffer start index
+ BufferStartPosition() int
+ // BufferLen the buffer data length
+ BufferLen() int
+ // DataID data id of the buffer
+ DataID() uint64
+ // DataSequence the data sequence under same data id
+ DataSequence() int
+ // IsStart this buffer is start of the same data id
+ IsStart() bool
+ // IsFinished this buffer is finish of the same data id
+ IsFinished() bool
+ // HaveReduceDataAfterChunk check have reduced data after current buffer
+ HaveReduceDataAfterChunk() bool
+
+ // StartTime the data start timestamp
StartTime() uint64
+ // EndTime the data end timestamp
EndTime() uint64
-
- MinDataID() int
- MaxDataID() int
}
type SocketDataUploadEvent struct {
Protocol base.ConnectionProtocol
- MsgType base.SocketMessageType
+ HaveReduce uint8
Direction0 base.SocketDataDirection
Finished uint8
- Sequence uint16
+ Sequence0 uint16
DataLen uint16
StartTime0 uint64
EndTime0 uint64
ConnectionID uint64
RandomID uint64
- DataID uint64
+ DataID0 uint64
TotalSize0 uint64
Buffer [2048]byte
}
@@ -65,6 +75,10 @@ func (s *SocketDataUploadEvent) BufferData() []byte {
return s.Buffer[:s.DataLen]
}
+func (s *SocketDataUploadEvent) BufferLen() int {
+ return int(s.DataLen)
+}
+
func (s *SocketDataUploadEvent) StartTime() uint64 {
return s.StartTime0
}
@@ -77,90 +91,48 @@ func (s *SocketDataUploadEvent) Direction() base.SocketDataDirection {
return s.Direction0
}
-func (s *SocketDataUploadEvent) FirstEvent() *SocketDataUploadEvent {
- return s
-}
-
-func (s *SocketDataUploadEvent) MinDataID() int {
- return int(s.DataID)
-}
-
-func (s *SocketDataUploadEvent) MaxDataID() int {
- return int(s.DataID)
-}
-
func (s *SocketDataUploadEvent) IsStart() bool {
- return s.Sequence == 0
+ return s.Sequence0 == 0
}
func (s *SocketDataUploadEvent) IsFinished() bool {
return s.Finished == 1
}
-func (s *SocketDataUploadEvent) Combine(other SocketDataBuffer) SocketDataBuffer {
- combined := &SocketDataUploadCombinedEvent{first: s}
- combined.realBuffer = append(s.Buffer[:s.DataLen], other.BufferData()...)
- combined.minDataID = int(s.DataID)
- if other.MinDataID() < combined.minDataID {
- combined.minDataID = other.MinDataID()
- }
- combined.maxDataID = int(s.DataID)
- if other.MaxDataID() > combined.maxDataID {
- combined.maxDataID = other.MaxDataID()
- }
- return combined
+func (s *SocketDataUploadEvent) DataID() uint64 {
+ return s.DataID0
}
-func (s *SocketDataUploadEvent) TotalSize() uint64 {
- return s.TotalSize0
-}
-
-type SocketDataUploadCombinedEvent struct {
- first *SocketDataUploadEvent
- realBuffer []byte
- minDataID int
- maxDataID int
-}
-
-func (s *SocketDataUploadCombinedEvent) BufferData() []byte {
- return s.realBuffer
-}
-
-func (s *SocketDataUploadCombinedEvent) TotalSize() uint64 {
- return s.first.TotalSize0
+func (s *SocketDataUploadEvent) DataSequence() int {
+ return int(s.Sequence0)
}
-func (s *SocketDataUploadCombinedEvent) StartTime() uint64 {
- return s.first.StartTime0
+func (s *SocketDataUploadEvent) BufferStartPosition() int {
+ return 0
}
-func (s *SocketDataUploadCombinedEvent) EndTime() uint64 {
- return s.first.EndTime0
+func (s *SocketDataUploadEvent) TotalSize() uint64 {
+ return s.TotalSize0
}
-func (s *SocketDataUploadCombinedEvent) MinDataID() int {
- return s.minDataID
+func (s *SocketDataUploadEvent) HaveReduceDataAfterChunk() bool {
+ return s.HaveReduce == 1
}
-func (s *SocketDataUploadCombinedEvent) MaxDataID() int {
- return s.maxDataID
+type SocketDataEventLimited struct {
+ SocketDataBuffer
+ from int
+ size int
}
-func (s *SocketDataUploadCombinedEvent) Direction() base.SocketDataDirection {
- return s.first.Direction0
+func (s *SocketDataEventLimited) BufferData() []byte {
+ return s.SocketDataBuffer.BufferData()[s.from:s.size]
}
-func (s *SocketDataUploadCombinedEvent) FirstEvent() *SocketDataUploadEvent {
- return s.first
+func (s *SocketDataEventLimited) BufferLen() int {
+ return s.size - s.from
}
-func (s *SocketDataUploadCombinedEvent) Combine(other SocketDataBuffer) SocketDataBuffer {
- s.realBuffer = append(s.realBuffer, other.BufferData()...)
- if other.MinDataID() < s.minDataID {
- s.minDataID = other.MinDataID()
- }
- if other.MaxDataID() > s.maxDataID {
- s.maxDataID = other.MaxDataID()
- }
- return s
+func (s *SocketDataEventLimited) BufferStartPosition() int {
+ return s.from
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
index 8afb92e..af401e1 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/base/protocol.go
@@ -18,22 +18,32 @@
package base
import (
+ "time"
+
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
)
+type ParseResult int
+
+const (
+ ParseResultSuccess ParseResult = iota
+ ParseResultSkipPackage
+)
+
type Protocol interface {
- Name() string
+ Protocol() base.ConnectionProtocol
GenerateMetrics() Metrics
Init(config *profiling.TaskConfig)
- ReceiveData(context Context, event *SocketDataUploadEvent) bool
+ ParseProtocol(connectionID uint64, metrics Metrics, reader *Buffer) ParseResult
+ PackageMaxExpireDuration() time.Duration
UpdateExtensionConfig(config *profiling.ExtensionConfig)
}
type Context interface {
QueryConnection(connectionID, randomID uint64) *base.ConnectionContext
- QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocolName string) Metrics
+ QueryProtocolMetrics(conMetrics *base.ConnectionMetricsContext, protocol base.ConnectionProtocol) Metrics
}
type Metrics interface {
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
index 499f803..c9b45fb 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go
@@ -18,30 +18,22 @@
package http1
import (
- "bufio"
- "bytes"
"container/list"
"encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/textproto"
- "strconv"
- "strings"
"sync"
+ "time"
"github.com/apache/skywalking-rover/pkg/logger"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
"github.com/sirupsen/logrus"
)
var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1")
-var ProtocolName = "http1"
-
var PackageSizeHistogramBuckets = []float64{
// 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB
256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000,
@@ -77,8 +69,8 @@ func NewHTTP1Analyzer() protocol.Protocol {
}
}
-func (h *Analyzer) Name() string {
- return ProtocolName
+func (h *Analyzer) Protocol() base.ConnectionProtocol {
+ return base.ConnectionProtocolHTTP
}
func (h *Analyzer) GenerateMetrics() protocol.Metrics {
@@ -94,260 +86,146 @@ func (h *Analyzer) Init(config *profiling.TaskConfig) {
h.sampleConfig = NewSamplingConfig(config)
}
-func (h *Analyzer) ReceiveData(context protocol.Context, event *protocol.SocketDataUploadEvent) bool {
- // only handle the HTTP1 protocol
- if event.Protocol != base.ConnectionProtocolHTTP {
- return false
- }
-
- connectionID := event.GenerateConnectionID()
- fromAnalyzerCache := false
- var connectionMetrics *ConnectionMetrics
- connection := context.QueryConnection(event.ConnectionID, event.RandomID)
- // if connection not exists, then cached it into the analyzer context
- if connection == nil {
- connectionMetrics = h.cache[connectionID]
- fromAnalyzerCache = true
- if connectionMetrics == nil {
- connectionMetrics = h.GenerateMetrics().(*ConnectionMetrics)
- h.cache[connectionID] = connectionMetrics
- }
- } else {
- connectionMetrics = context.QueryProtocolMetrics(connection.Metrics, ProtocolName).(*ConnectionMetrics)
- }
-
- log.Debugf("receive connection: %s, dataid: %d, sequence: %d, finished: %d, message type: %s, direction: %s, size: %d, total size: %d",
- connectionID, event.DataID, event.Sequence, event.Finished, event.MsgType.String(), event.Direction().String(), event.DataLen, event.TotalSize0)
- // if the cache is existing in the analyzer context, then delete it
- if !fromAnalyzerCache {
- if tmp := h.cache[connectionID]; tmp != nil {
- connectionMetrics.MergeFrom(h, tmp)
- delete(h.cache, connectionID)
- }
+func (h *Analyzer) ParseProtocol(connectionID uint64, metrics protocol.Metrics, buf *protocol.Buffer) protocol.ParseResult {
+ connectionMetrics := metrics.(*ConnectionMetrics)
+ messageType, err := reader.IdentityMessageType(buf)
+ if err != nil {
+ return protocol.ParseResultSkipPackage
}
- req, resp := h.buildHTTP1(connectionMetrics.halfData, event)
- if req != nil && resp != nil {
- if err := h.analyze(context, connectionID, connectionMetrics, req, resp); err != nil {
- log.Errorf("HTTP1 analyze failure: %v", err)
- return false
- }
- } else {
- log.Debugf("connnection: %s, remaining half data list size: %d", connectionID, connectionMetrics.halfData.Len())
+ var result protocol.ParseResult
+ switch messageType {
+ case reader.MessageTypeRequest:
+ result, err = h.handleRequest(connectionMetrics, buf)
+ case reader.MessageTypeResponse:
+ result, err = h.handleResponse(connectionID, connectionMetrics, buf)
+ case reader.MessageTypeUnknown:
+ return protocol.ParseResultSkipPackage
}
- return true
-}
-func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
- if config == nil {
- return
+ if err != nil {
+ log.Warnf("reading %v error: %v", messageType, err)
+ return protocol.ParseResultSkipPackage
+ } else if result != protocol.ParseResultSuccess {
+ return result
}
- h.sampleConfig.UpdateRules(config.NetworkSamplings)
+ return protocol.ParseResultSuccess
}
-func (h *Analyzer) combineAndRemoveEvent(halfConnections *list.List, firstElement *list.Element,
- lastAppender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
- firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
- if firstEvent.Sequence == 0 && firstEvent.Finished == 1 {
- halfConnections.Remove(firstElement)
- return h.combineEventIfNeed(firstEvent, lastAppender)
+func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf *protocol.Buffer) (protocol.ParseResult, error) {
+ // parsing request
+ req, r, err := reader.ReadRequest(buf)
+ if err != nil {
+ return protocol.ParseResultSkipPackage, err
}
- next := firstElement.Next()
- halfConnections.Remove(firstElement)
- var buffer protocol.SocketDataBuffer = firstEvent
- // for-each the events until buffer finished
- for next != nil {
- event := next.Value.(*protocol.SocketDataUploadEvent)
-
- buffer = buffer.Combine(event)
-
- tmp := next.Next()
- halfConnections.Remove(next)
- next = tmp
- // combine event
- if event.Finished == 1 {
- return h.combineEventIfNeed(buffer, lastAppender)
- }
+ if r != protocol.ParseResultSuccess {
+ return r, nil
}
- return h.combineEventIfNeed(buffer, lastAppender)
-}
-func (h *Analyzer) combineEventIfNeed(data, appender protocol.SocketDataBuffer) protocol.SocketDataBuffer {
- if appender != nil {
- return data.Combine(appender)
- }
- return data
+ metrics.AppendRequestToList(req)
+ return protocol.ParseResultSuccess, nil
}
-func (h *Analyzer) buildHTTP1(halfConnections *list.List, event *protocol.SocketDataUploadEvent) (request, response protocol.SocketDataBuffer) {
- // no connections, then just add the response to the half connections to wait the request
- if halfConnections.Len() == 0 {
- halfConnections.PushBack(event)
- return nil, nil
+func (h *Analyzer) handleResponse(connectionID uint64, metrics *ConnectionMetrics, buf *protocol.Buffer) (protocol.ParseResult, error) {
+ // find the first request
+ firstElement := metrics.halfData.Front()
+ if firstElement == nil {
+ return protocol.ParseResultSkipPackage, nil
}
+ request := metrics.halfData.Remove(firstElement).(*reader.Request)
- // quick handler(only one element, and is request)
- if halfConnections.Len() == 1 {
- firstElement := halfConnections.Front()
- firstEvent := firstElement.Value.(*protocol.SocketDataUploadEvent)
- if firstEvent.IsStart() && firstEvent.Finished == 1 && event.IsStart() && event.Finished == 1 &&
- firstEvent.DataID+1 == event.DataID && firstEvent.MsgType == base.SocketMessageTypeRequest &&
- event.MsgType == base.SocketMessageTypeResponse {
- return h.combineAndRemoveEvent(halfConnections, firstElement, nil), event
- }
- }
-
- // push to the queue
- h.insertToList(halfConnections, event)
-
- // trying to find completed request and response
- return NewHTTP1BufferAnalyzer(h).Analyze(halfConnections)
-}
-
-func (h *Analyzer) insertToList(halfConnections *list.List, event *protocol.SocketDataUploadEvent) {
- if halfConnections.Len() == 0 {
- halfConnections.PushFront(event)
- return
- }
- if halfConnections.Back().Value.(*protocol.SocketDataUploadEvent).DataID < event.DataID {
- halfConnections.PushBack(event)
- return
- }
- beenAdded := false
- for element := halfConnections.Front(); element != nil; element = element.Next() {
- existEvent := element.Value.(*protocol.SocketDataUploadEvent)
- if existEvent.DataID > event.DataID {
- // data id needs order
- beenAdded = true
- } else if existEvent.DataID == event.DataID {
- if existEvent.MsgType == event.MsgType && existEvent.Sequence > event.Sequence {
- // same message type and following the sequence order
- beenAdded = true
- } else if existEvent.MsgType > event.MsgType {
- // request needs before response
- beenAdded = true
- }
- }
- if beenAdded {
- halfConnections.InsertBefore(event, element)
- break
- }
- }
- if !beenAdded {
- halfConnections.PushBack(event)
- }
-}
-
-func (h *Analyzer) analyze(_ protocol.Context, connectionID string, connectionMetrics *ConnectionMetrics,
- requestBuffer, responseBuffer protocol.SocketDataBuffer) error {
- request, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(requestBuffer.BufferData())))
- if err != nil {
- return fmt.Errorf("parse request failure: data length: %d, total data length: %d, %v",
- len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
- }
-
- response, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
- if response != nil {
- defer response.Body.Close()
- }
+ // parsing request
+ response, r, err := reader.ReadResponse(request, buf)
if err != nil {
- if err == io.ErrUnexpectedEOF || err == io.EOF {
- response, err = h.tryingToReadResponseWithoutHeaders(bufio.NewReader(bytes.NewBuffer(responseBuffer.BufferData())), request)
- if err != nil {
- return fmt.Errorf("parsing simple data error: %v", err)
- }
- if response != nil && response.Body != nil {
- defer response.Body.Close()
- }
- }
- if err != nil {
- return fmt.Errorf("parse response failure, data length: %d, total data length: %d, %v",
- len(requestBuffer.BufferData()), requestBuffer.TotalSize(), err)
- }
+ return protocol.ParseResultSkipPackage, err
+ } else if r != protocol.ParseResultSuccess {
+ return r, nil
}
// lock append metrics with read locker
- connectionMetrics.metricsLocker.RLock()
- defer connectionMetrics.metricsLocker.RUnlock()
+ metrics.metricsLocker.RLock()
+ defer metrics.metricsLocker.RUnlock()
// append metrics
- data := connectionMetrics.clientMetrics
+ data := metrics.clientMetrics
side := base.ConnectionRoleClient
- if requestBuffer.Direction() == base.SocketDataDirectionIngress {
+ if request.Direction() == base.SocketDataDirectionIngress {
// if receive the request, that's mean is server side
- data = connectionMetrics.serverMetrics
+ data = metrics.serverMetrics
side = base.ConnectionRoleServer
}
- data.Append(h.sampleConfig, request, requestBuffer, response, responseBuffer)
+ data.Append(h.sampleConfig, request, response)
if log.Enable(logrus.DebugLevel) {
metricsJSON, _ := json.Marshal(data)
- log.Debugf("generated metrics, connection id: %s, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
+ log.Debugf("generated metrics, connection id: %d, side: %s, metrisc: %s", connectionID, side.String(), string(metricsJSON))
}
- return nil
+ return protocol.ParseResultSuccess, nil
}
-func (h *Analyzer) tryingToReadResponseWithoutHeaders(reader *bufio.Reader, request *http.Request) (*http.Response, error) {
- if reader.Size() < 16 {
- return nil, fmt.Errorf("the header length not enough")
- }
- tp := textproto.NewReader(reader)
- resp := &http.Response{
- Request: request,
+func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
+ if config == nil {
+ return
}
+ h.sampleConfig.UpdateRules(config.NetworkSamplings)
+}
- line, err := tp.ReadLine()
- if err != nil {
- return nil, fmt.Errorf("read response first line failure: %v", err)
- }
- indexByte := strings.IndexByte(line, ' ')
- if indexByte == -1 {
- return nil, fmt.Errorf("parsing response error: %s", line)
- }
- resp.Proto = line[:indexByte]
- resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
- statusCode := resp.Status
- if i := strings.IndexByte(resp.Status, ' '); i != -1 {
- statusCode = resp.Status[:i]
+func (h *Analyzer) PackageMaxExpireDuration() time.Duration {
+ return time.Minute
+}
+
+func (m *ConnectionMetrics) AppendRequestToList(req *reader.Request) {
+ if m.halfData.Len() == 0 {
+ m.halfData.PushFront(req)
+ return
}
- if len(statusCode) != 3 {
- return nil, fmt.Errorf("parsing response status code failure: %v", statusCode)
+ if m.halfData.Back().Value.(*reader.Request).MinDataID() < req.MinDataID() {
+ m.halfData.PushBack(req)
+ return
}
- resp.StatusCode, err = strconv.Atoi(statusCode)
- if err != nil || resp.StatusCode < 0 {
- return nil, fmt.Errorf("status code not correct: %s", statusCode)
+ beenAdded := false
+ for element := m.halfData.Front(); element != nil; element = element.Next() {
+ existEvent := element.Value.(*reader.Request)
+ if existEvent.MinDataID() > req.MinDataID() {
+ m.halfData.InsertBefore(req, element)
+ beenAdded = true
+ break
+ }
}
- var ok bool
- if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
- return nil, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+ if !beenAdded {
+ m.halfData.PushBack(req)
}
-
- return resp, nil
}
-func (h *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
+func (m *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
other := data.(*ConnectionMetrics)
other.metricsLocker.Lock()
defer other.metricsLocker.Unlock()
- h.clientMetrics.MergeAndClean(other.clientMetrics)
- h.serverMetrics.MergeAndClean(other.serverMetrics)
+ if other.halfData != nil {
+ for element := other.halfData.Front(); element != nil; element = element.Next() {
+ m.AppendRequestToList(element.Value.(*reader.Request))
+ }
+ }
+
+ m.clientMetrics.MergeAndClean(other.clientMetrics)
+ m.serverMetrics.MergeAndClean(other.serverMetrics)
if log.Enable(logrus.DebugLevel) {
- clientMetrics, _ := json.Marshal(h.clientMetrics)
- serverMetrics, _ := json.Marshal(h.serverMetrics)
+ clientMetrics, _ := json.Marshal(m.clientMetrics)
+ serverMetrics, _ := json.Marshal(m.serverMetrics)
log.Debugf("combine metrics: conid: %d_%d, client side metrics: %s, server side metrics: %s",
connection.ConnectionID, connection.RandomID, clientMetrics, serverMetrics)
}
}
-func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
+func (m *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
for _, p := range traffic.LocalProcesses {
// if the remote process is profiling, then used the client side
- localMetrics := h.clientMetrics
- remoteMetrics := h.serverMetrics
+ localMetrics := m.clientMetrics
+ remoteMetrics := m.serverMetrics
if traffic.Role == base.ConnectionRoleServer {
- localMetrics = h.serverMetrics
- remoteMetrics = h.clientMetrics
+ localMetrics = m.serverMetrics
+ remoteMetrics = m.clientMetrics
}
metricsCount := localMetrics.appendMetrics(traffic, p, "", metricsBuilder, false)
@@ -362,15 +240,7 @@ func (h *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBu
// if remote process is profiling, then the metrics data need to be cut half
log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)",
traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(),
- h.clientMetrics.String(), h.serverMetrics.String())
- }
- }
-}
-
-func (h *ConnectionMetrics) MergeFrom(analyzer *Analyzer, other *ConnectionMetrics) {
- if other.halfData != nil {
- for element := other.halfData.Front(); element != nil; element = element.Next() {
- analyzer.insertToList(h.halfData, element.Value.(*protocol.SocketDataUploadEvent))
+ m.clientMetrics.String(), m.serverMetrics.String())
}
}
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
deleted file mode 100644
index a663a6e..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer_test.go
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
- "bufio"
- "container/list"
- "net/http"
- "reflect"
- "strings"
- "testing"
-
- base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
- "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-var unknown, request, response = 0, 1, 2
-var finished, notFinished = 1, 0
-
-// nolint
-func TestBuildHTTP1(t *testing.T) {
- tests := []struct {
- name string
- events []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }
- http []struct {
- start int
- end int
- }
- residueID []int
- }{
- {
- name: "simple",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {1, request, 0, notFinished, ""},
- {1, request, 1, notFinished, ""},
- {1, request, 2, finished, ""},
- {2, response, 0, notFinished, ""},
- {2, response, 1, finished, ""},
-
- {3, request, 0, finished, ""},
- {4, response, 0, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{
- {1, 2},
- {3, 4},
- },
- residueID: []int{},
- },
- {
- name: "response before request",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {2, response, 0, finished, ""},
- {1, request, 1, finished, ""},
- {1, request, 0, notFinished, ""},
-
- {3, request, 0, notFinished, ""},
- {3, request, 1, notFinished, ""},
- {4, response, 1, finished, ""},
- {4, response, 0, notFinished, ""},
- {3, request, 2, finished, ""},
-
- {5, request, 0, notFinished, ""},
- {5, request, 1, notFinished, ""},
- {6, response, 1, finished, ""},
- {6, response, 0, notFinished, ""},
- {5, request, 2, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{
- {1, 2},
- {3, 4},
- {5, 6},
- },
- residueID: []int{},
- },
- {
- name: "residue requests",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {1, request, 0, finished, ""},
- {2, response, 1, finished, ""},
- {2, response, 0, notFinished, ""},
-
- {3, request, 0, finished, ""},
- {4, response, 0, notFinished, ""},
-
- {5, request, 1, finished, ""},
- {6, response, 0, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{
- {1, 2},
- },
- residueID: []int{3, 4, 5, 6},
- },
- {
- name: "multiple request",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {1, request, 0, finished, ""},
- {2, request, 0, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{},
- residueID: []int{1, 2},
- },
- {
- name: "multiple response",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {1, request, 0, finished, ""},
- {3, response, 1, finished, ""},
- {4, response, 0, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{},
- residueID: []int{1, 3, 4},
- },
- {
- name: "unfinished response",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {42, response, 0, notFinished, ""},
- {42, response, 1, notFinished, ""},
- {42, response, 2, finished, ""},
-
- {48, request, 0, finished, ""},
- {50, response, 0, notFinished, ""},
- {50, response, 1, notFinished, ""},
- {50, response, 2, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{
- {48, 50},
- },
- residueID: []int{42, 42, 42},
- },
- {
- name: "unknown data",
- events: []struct {
- dataID int
- dataType int
- sequence int
- finished int
- data string
- }{
- {1, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
- {1, unknown, 1, notFinished, "Host: test.com\n\r\n"},
- {2, response, 0, finished, ""},
-
- {3, unknown, 1, notFinished, "Host: test.com\n\r\n"},
- {4, response, 0, finished, ""},
- {3, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
-
- {6, unknown, 1, notFinished, "Host: test.com\n\r\n"},
- {5, request, 0, finished, ""},
- {6, unknown, 0, notFinished, "HTTP/1.1 200 OK\r\n"},
-
- // request not finished
- {7, unknown, 1, notFinished, "Host: test.com\n"},
- {8, response, 0, finished, ""},
- {7, unknown, 0, notFinished, "GET / HTTP/1.1\r\n"},
- {9, request, 0, finished, ""},
- {10, response, 0, finished, ""},
- },
- http: []struct {
- start int
- end int
- }{
- {1, 2},
- {3, 4},
- {5, 6},
- {9, 10},
- },
- residueID: []int{7, 7, 8},
- },
- }
-
- for _, testCase := range tests {
- //t.Run(testCase.name, func(t *testing.T) {
- analyzer := NewHTTP1Analyzer().(*Analyzer)
- l := list.New()
- var events = make([]struct {
- start, end int
- }, 0)
- for _, event := range testCase.events {
- req, resp := analyzer.buildHTTP1(l, &base2.SocketDataUploadEvent{
- DataID: uint64(event.dataID),
- MsgType: base.SocketMessageType(event.dataType),
- Sequence: uint16(event.sequence),
- Finished: uint8(event.finished),
- Buffer: bufferConvert(event.data),
- DataLen: uint16(len(event.data)),
- })
- if req != nil && resp != nil {
- events = append(events, struct{ start, end int }{start: req.MinDataID(), end: resp.MaxDataID()})
- }
- }
-
- if !reflect.DeepEqual(testCase.http, events) {
- t.Fatalf("excepted http: %v, actual: %v", testCase.http, events)
- }
-
- exceptedList := testCase.residueID
- if exceptedList == nil {
- exceptedList = make([]int, 0)
- }
- actualList := make([]int, 0)
- for element := l.Front(); element != nil; element = element.Next() {
- actualList = append(actualList, int(element.Value.(*base2.SocketDataUploadEvent).DataID))
- }
- if !reflect.DeepEqual(exceptedList, actualList) {
- t.Fatalf("excepted residue data list: %v, actual: %v", exceptedList, actualList)
- }
- //})
- }
-}
-
-var defaultBuffer [2048]byte
-
-func bufferConvert(data string) [2048]byte {
- if data == "" {
- return defaultBuffer
- }
- var buffer [2048]byte
- for inx, d := range []byte(data) {
- buffer[inx] = d
- }
- return buffer
-}
-
-func TestParseSimpleHTTP1Response(t *testing.T) {
- s := `HTTP/1.0 200 OK\r\n`
- h := &http.Request{}
- analyzer := NewHTTP1Analyzer().(*Analyzer)
- resp, err := analyzer.tryingToReadResponseWithoutHeaders(bufio.NewReader(strings.NewReader(s)), h)
- if err != nil {
- t.Fatalf("reading simple response error: %v", err)
- }
- if resp.Body != nil {
- defer resp.Body.Close()
- }
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
deleted file mode 100644
index a55ae5f..0000000
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/builder.go
+++ /dev/null
@@ -1,290 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package http1
-
-import (
- "bufio"
- "bytes"
- "container/list"
- "net/http"
-
- base2 "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
-
- "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
-)
-
-type BufferAnalyzer struct {
- http1Analyzer *Analyzer
-
- unknownEventBuffer base2.SocketDataBuffer
- unknownElement *list.Element
- unknownSize int
- request *base2.SocketDataUploadEvent
- requestElement *list.Element
- response *base2.SocketDataUploadEvent
- responseElement *list.Element
-
- unknownDataID uint64
- unknownMaxSequence uint16
- reqDataID uint64
- reqMaxSequence uint16
- reqFinished bool
- respDataID uint64
- respMaxSequence uint16
- respFinished bool
-}
-
-func NewHTTP1BufferAnalyzer(http1 *Analyzer) *BufferAnalyzer {
- return &BufferAnalyzer{http1Analyzer: http1}
-}
-
-func (h *BufferAnalyzer) Analyze(events *list.List) (request, response base2.SocketDataBuffer) {
- for element := events.Front(); element != nil; element = element.Next() {
- curEvent := element.Value.(*base2.SocketDataUploadEvent)
- // transform the unknown to the request or response
- if continueReading, req, resp := h.handleUnknown(events, element, curEvent); req != nil && resp != nil {
- return req, resp
- } else if continueReading {
- continue
- }
-
- if continueReading, req, resp := h.handleRequest(events, element, curEvent); req != nil && resp != nil {
- return req, resp
- } else if continueReading {
- continue
- }
-
- if req, resp := h.handleResponse(events, element, curEvent); req != nil && resp != nil {
- return req, resp
- }
- }
- return nil, nil
-}
-
-func (h *BufferAnalyzer) handleUnknown(event *list.List, element *list.Element,
- curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
- if curEvent.MsgType != base.SocketMessageTypeUnknown {
- return false, nil, nil
- }
- if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
- // maybe the unknown type is response, so clean the context
- if !curEvent.IsStart() {
- h.cleanContext()
- return true, nil, nil
- }
- h.resetStartUnknown(element, curEvent)
- req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
- if req != nil && resp != nil {
- return false, req, resp
- }
- return true, nil, nil
- }
- if curEvent.MsgType == base.SocketMessageTypeUnknown {
- if h.unknownDataID == curEvent.DataID && h.unknownMaxSequence+1 == curEvent.Sequence {
- h.unknownEventBuffer = h.unknownEventBuffer.Combine(curEvent)
- h.unknownMaxSequence++
- } else if curEvent.IsStart() {
- h.resetStartUnknown(element, curEvent)
- } else {
- h.cleanContext()
- }
-
- req, resp = h.tryingToAnalyzeTheUnknown(event, curEvent)
- if req != nil && resp != nil {
- return false, req, resp
- }
- return true, nil, nil
- }
- return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleRequest(events *list.List, element *list.Element,
- curEvent *base2.SocketDataUploadEvent) (continueReading bool, req, resp base2.SocketDataBuffer) {
- if h.request == nil {
- // find the first request package event
- if curEvent.MsgType == base.SocketMessageTypeRequest && curEvent.IsStart() {
- h.resetStartRequest(element, curEvent)
- }
- return true, nil, nil
- }
- if curEvent.MsgType == base.SocketMessageTypeRequest {
- // if the request not finished and latest request sequence match with current event
- // then keep the request tracing
- if !h.reqFinished && h.reqDataID == curEvent.DataID && h.reqMaxSequence+1 == curEvent.Sequence {
- h.reqMaxSequence++
- h.reqFinished = curEvent.IsFinished()
- } else if curEvent.IsStart() {
- // if current event is new one, then update to current request
- h.resetStartRequest(element, curEvent)
- } else {
- // Otherwise, clean the request and response context
- h.cleanContext()
- }
-
- // if request and response all finished, then return
- if h.reqFinished && h.respFinished {
- req, resp = h.buildHTTP(events)
- return false, req, resp
- }
-
- return true, nil, nil
- }
- return false, nil, nil
-}
-
-func (h *BufferAnalyzer) handleResponse(events *list.List, element *list.Element,
- curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
- if h.response == nil {
- // if current response is not start, then clean to re-find new one
- if !curEvent.IsStart() {
- h.cleanContext()
- return nil, nil
- }
- h.resetStartResponse(element, curEvent)
- if h.reqFinished && h.respFinished {
- return h.buildHTTP(events)
- }
- return nil, nil
- }
-
- // if a new response, then clean the re-find new one, wait the previous data
- if curEvent.IsStart() {
- h.cleanContext()
- return nil, nil
- }
-
- // if response sequence is broken, then clean the context
- if h.respDataID != curEvent.DataID || h.respMaxSequence+1 != curEvent.Sequence {
- h.cleanContext()
- return nil, nil
- }
- h.respDataID = curEvent.DataID
- h.respMaxSequence = curEvent.Sequence
-
- if h.reqFinished && curEvent.IsFinished() {
- return h.buildHTTP(events)
- }
- return nil, nil
-}
-
-func (h *BufferAnalyzer) resetStartUnknown(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
- h.unknownEventBuffer = curEvent
- h.unknownElement = element
- h.unknownDataID = curEvent.DataID
- h.unknownMaxSequence = curEvent.Sequence
-}
-
-func (h *BufferAnalyzer) resetStartRequest(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
- h.request = curEvent
- h.reqDataID = curEvent.DataID
- h.reqMaxSequence = curEvent.Sequence
- h.reqFinished = curEvent.IsFinished()
- h.requestElement = element
-}
-
-func (h *BufferAnalyzer) resetStartResponse(element *list.Element, curEvent *base2.SocketDataUploadEvent) {
- h.response = curEvent
- h.respDataID = curEvent.DataID
- h.respMaxSequence = curEvent.Sequence
- h.responseElement = element
- h.respFinished = curEvent.IsFinished()
-}
-
-func (h *BufferAnalyzer) tryingToAnalyzeTheUnknown(events *list.List, curEvent *base2.SocketDataUploadEvent) (req, resp base2.SocketDataBuffer) {
- if h.unknownEventBuffer == base2.SocketDataBuffer(nil) {
- return nil, nil
- }
- // length not enough
- if len(h.unknownEventBuffer.BufferData()) < 16 {
- return nil, nil
- }
- _, err := http.ReadRequest(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())))
- if err == nil {
- // update the event as request
- curEvent.Finished = 1
- h.transformUnknown(h.unknownElement, base.SocketMessageTypeRequest)
- // update the current data is request
- h.resetStartRequest(h.unknownElement, h.unknownEventBuffer.FirstEvent())
- h.reqFinished = true
- h.cleanResponseContext()
- h.cleanUnknownContext()
- return nil, nil
- }
- tmpResponse, err := http.ReadResponse(bufio.NewReader(bytes.NewBuffer(h.unknownEventBuffer.BufferData())), &http.Request{})
- if err == nil {
- defer tmpResponse.Body.Close()
- curEvent.Finished = 1
- h.transformUnknown(h.unknownElement, base.SocketMessageTypeResponse)
- // if request already finished, then remove the request
- if h.reqFinished {
- h.resetStartResponse(h.unknownElement, h.unknownEventBuffer.FirstEvent())
- return h.buildHTTP(events)
- }
- // otherwise, clean the context and wait request
- h.cleanContext()
- }
- return nil, nil
-}
-
-func (h *BufferAnalyzer) transformUnknown(element *list.Element, msgType base.SocketMessageType) {
- // update message type and total size
- firstEvent := element.Value.(*base2.SocketDataUploadEvent)
- firstEvent.MsgType = msgType
- dataLen := int(firstEvent.DataLen)
- for e := element.Next(); e != nil; e = e.Next() {
- curEvent := e.Value.(*base2.SocketDataUploadEvent)
- if curEvent.Finished == 1 {
- curEvent.MsgType = msgType
- dataLen += int(curEvent.DataLen)
- firstEvent.TotalSize0 = uint64(dataLen)
- return
- }
- curEvent.MsgType = msgType
- dataLen += int(curEvent.DataLen)
- }
-}
-
-func (h *BufferAnalyzer) cleanContext() {
- h.cleanUnknownContext()
- h.cleanRequestContext()
- h.cleanResponseContext()
-}
-
-func (h *BufferAnalyzer) cleanResponseContext() {
- h.response = nil
- h.respDataID = 0
- h.respMaxSequence = 0
- h.respFinished = false
-}
-
-func (h *BufferAnalyzer) cleanRequestContext() {
- h.request = nil
- h.reqDataID = 0
- h.reqMaxSequence = 0
- h.reqFinished = false
-}
-
-func (h *BufferAnalyzer) cleanUnknownContext() {
- h.unknownEventBuffer, h.unknownElement = nil, nil
- h.unknownSize, h.unknownDataID, h.unknownMaxSequence = 0, 0, 0
-}
-
-func (h *BufferAnalyzer) buildHTTP(events *list.List) (req, resp base2.SocketDataBuffer) {
- return h.http1Analyzer.combineAndRemoveEvent(events, h.requestElement, nil),
- h.http1Analyzer.combineAndRemoveEvent(events, h.responseElement, nil)
-}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
index 5756206..9290d34 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go
@@ -18,23 +18,16 @@
package http1
import (
- "bufio"
- "bytes"
- "compress/gzip"
"encoding/json"
"fmt"
- "io"
- "mime"
- "net/http"
"strings"
"time"
- "golang.org/x/net/html/charset"
-
"github.com/apache/skywalking-rover/pkg/process/api"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/host"
@@ -80,27 +73,28 @@ func NewHTTP1URIMetrics() *URIMetrics {
}
}
-func (u *URIMetrics) Append(sampleConfig *SamplingConfig,
- req *http.Request, reqBuffer protocol.SocketDataBuffer, resp *http.Response, respBuffer protocol.SocketDataBuffer) {
+func (u *URIMetrics) Append(sampleConfig *SamplingConfig, req *reader.Request, resp *reader.Response) {
u.RequestCounter.Increase()
- statusCounter := u.StatusCounter[resp.StatusCode]
+ statusCounter := u.StatusCounter[resp.StatusCode()]
if statusCounter == nil {
statusCounter = metrics.NewCounter()
- u.StatusCounter[resp.StatusCode] = statusCounter
+ u.StatusCounter[resp.StatusCode()] = statusCounter
}
statusCounter.Increase()
- u.AvgRequestPackageSize.Increase(float64(reqBuffer.TotalSize()))
- u.AvgResponsePackageSize.Increase(float64(respBuffer.TotalSize()))
- u.ReqPackageSizeHistogram.Increase(float64(reqBuffer.TotalSize()))
- u.RespPackageSizeHistogram.Increase(float64(respBuffer.TotalSize()))
+ requestTotalSize := req.ContentTotalSize()
+ responseTotalSize := resp.ContentTotalSize()
+ u.AvgRequestPackageSize.Increase(float64(requestTotalSize))
+ u.AvgResponsePackageSize.Increase(float64(responseTotalSize))
+ u.ReqPackageSizeHistogram.Increase(float64(requestTotalSize))
+ u.RespPackageSizeHistogram.Increase(float64(responseTotalSize))
- duration := time.Duration(respBuffer.EndTime() - reqBuffer.StartTime())
+ duration := time.Duration(resp.EndTime() - req.StartTime())
durationInMS := float64(duration.Milliseconds())
u.avgDuration.Increase(durationInMS)
u.durationHistogram.Increase(durationInMS)
- u.sampler.AppendMetrics(sampleConfig, duration, req, resp, reqBuffer, respBuffer)
+ u.sampler.AppendMetrics(sampleConfig, duration, req, resp)
}
func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
@@ -177,15 +171,13 @@ func (u *URIMetrics) String() string {
}
type Trace struct {
- Trace protocol.TracingContext
- RequestURI string
- RequestBuffer protocol.SocketDataBuffer
- Request *http.Request
- ResponseBuffer protocol.SocketDataBuffer
- Response *http.Response
- Type string
- Settings *profiling.NetworkDataCollectingSettings
- TaskConfig *profiling.HTTPSamplingConfig
+ Trace protocol.TracingContext
+ RequestURI string
+ Request *reader.Request
+ Response *reader.Response
+ Type string
+ Settings *profiling.NetworkDataCollectingSettings
+ TaskConfig *profiling.HTTPSamplingConfig
}
func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
@@ -212,7 +204,7 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
SSL: traffic.IsSSL,
URI: h.RequestURI,
Reason: h.Type,
- Status: h.Response.StatusCode,
+ Status: h.Response.StatusCode(),
}
if traffic.Role == base.ConnectionRoleClient {
body.ClientProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
@@ -238,35 +230,35 @@ func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *bas
func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
events := make([]*v3.SpanAttachedEvent, 0)
if h.Settings != nil && h.Settings.RequireCompleteRequest {
- events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.Header,
- h.Request.Body, h.RequestBuffer, h.Settings.MaxRequestSize)
+ events = h.appendHTTPEvent(events, process, traffic, transportRequest, h.Request.MessageOpt, h.TaskConfig.DefaultRequestEncoding,
+ h.Settings.MaxRequestSize)
}
if h.Settings != nil && h.Settings.RequireCompleteResponse {
- events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.Header,
- h.Response.Body, h.ResponseBuffer, h.Settings.MaxResponseSize)
+ events = h.appendHTTPEvent(events, process, traffic, transportResponse, h.Response.MessageOpt, h.TaskConfig.DefaultResponseEncoding,
+ h.Settings.MaxResponseSize)
}
metricsBuilder.AppendSpanAttachedEvents(events)
}
func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
- tp string, header http.Header, body io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) []*v3.SpanAttachedEvent {
- content, err := h.transformHTTPBody(tp, header, body, buffer, maxSize)
+ tp string, message *reader.MessageOpt, defaultBodyEncoding string, maxSize int32) []*v3.SpanAttachedEvent {
+ content, err := message.TransformReadableContent(defaultBodyEncoding, int(maxSize))
if err != nil {
log.Warnf("transform http %s erorr: %v", tp, err)
return events
}
event := &v3.SpanAttachedEvent{}
- event.StartTime = host.TimeToInstant(buffer.StartTime())
- event.EndTime = host.TimeToInstant(buffer.EndTime())
+ event.StartTime = host.TimeToInstant(message.StartTime())
+ event.EndTime = host.TimeToInstant(message.EndTime())
event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
event.Tags = make([]*commonv3.KeyStringValuePair, 0)
event.Tags = append(event.Tags,
// content data
- &commonv3.KeyStringValuePair{Key: "data_size", Value: units.BytesSize(float64(buffer.TotalSize()))},
+ &commonv3.KeyStringValuePair{Key: "data_size", Value: units.BytesSize(float64(message.ContentTotalSize()))},
&commonv3.KeyStringValuePair{Key: "data_content", Value: content},
- &commonv3.KeyStringValuePair{Key: "data_direction", Value: buffer.Direction().String()},
+ &commonv3.KeyStringValuePair{Key: "data_direction", Value: message.Direction().String()},
&commonv3.KeyStringValuePair{Key: "data_type", Value: strings.ToLower(tp)},
// connection
&commonv3.KeyStringValuePair{Key: "connection_role", Value: traffic.Role.String()},
@@ -286,112 +278,6 @@ func (h *Trace) appendHTTPEvent(events []*v3.SpanAttachedEvent, process api.Proc
return append(events, event)
}
-// nolint
-func (h *Trace) transformHTTPBody(tp string, header http.Header, _ io.Reader, buffer protocol.SocketDataBuffer, maxSize int32) (string, error) {
- var needGzip, isPlain, isUtf8 = header.Get("Content-Encoding") == "gzip", true, true
- contentType := header.Get("Content-Type")
- if contentType == "" {
- if tp == transportRequest {
- contentType = h.TaskConfig.DefaultRequestEncoding
- } else {
- contentType = h.TaskConfig.DefaultResponseEncoding
- }
- contentType = fmt.Sprintf("text/html; charset=%s", contentType)
- }
-
- isPlain = strings.HasPrefix(contentType, "text/") || contentType == "application/json"
- if _, params, err := mime.ParseMediaType(contentType); err == nil {
- if cs, ok := params["charset"]; ok {
- isUtf8 = strings.ToLower(cs) == "utf-8"
- }
- }
-
- if !needGzip && isPlain && isUtf8 {
- resultSize := len(buffer.BufferData())
- if maxSize > 0 && resultSize > int(maxSize) {
- resultSize = int(maxSize)
- }
- return string(buffer.BufferData()[0:resultSize]), nil
- }
-
- // re-read the buffer and skip to the body position
- buf := bufio.NewReaderSize(bytes.NewBuffer(buffer.BufferData()), len(buffer.BufferData()))
- var httpBody io.ReadCloser
- if tp == transportRequest {
- req, err := http.ReadRequest(buf)
- if err != nil {
- return "", err
- }
- httpBody = req.Body
- } else {
- response, err := http.ReadResponse(buf, nil)
- if err != nil {
- return "", err
- }
- httpBody = response.Body
- }
- defer httpBody.Close()
-
- // no text plain, no need to print the data
- headerLen := len(buffer.BufferData()) - buf.Buffered()
- if maxSize > 0 && int(maxSize) < headerLen {
- return string(buffer.BufferData()[:maxSize]), nil
- }
- headerString := string(buffer.BufferData()[:headerLen])
- if !isPlain {
- return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
- }
- // nobody
- if buf.Buffered() == 0 {
- return headerString, nil
- }
-
- data := httpBody
- var err error
- if needGzip {
- data, err = gzip.NewReader(httpBody)
- if err != nil {
- return "", err
- }
- }
- if !isUtf8 {
- data, err = newCharsetReader(data, contentType)
- if err != nil {
- return "", err
- }
- }
-
- realData, err := io.ReadAll(data)
- if err != nil && err != io.ErrUnexpectedEOF {
- return "", err
- }
- resultSize := len(realData)
- if maxSize > 0 && (resultSize+headerLen) > int(maxSize) {
- resultSize = int(maxSize) - headerLen
- }
- return fmt.Sprintf("%s%s", headerString, string(realData[0:resultSize])), nil
-}
-
-type charsetReadWrapper struct {
- reader io.Reader
-}
-
-func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, error) {
- reader, err := charset.NewReader(r, contentType)
- if err != nil {
- return nil, err
- }
- return &charsetReadWrapper{reader: reader}, nil
-}
-
-func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
- return c.reader.Read(p)
-}
-
-func (c *charsetReadWrapper) Close() error {
- return nil
-}
-
type SamplingTraceLogBody struct {
URI string `json:"uri"`
Reason string `json:"reason"`
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
new file mode 100644
index 0000000..90863e8
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+ "bufio"
+ "compress/gzip"
+ "fmt"
+ "io"
+ "mime"
+ "net/http"
+ "strconv"
+ "strings"
+
+ "github.com/apache/skywalking-rover/pkg/logger"
+
+ "golang.org/x/net/html/charset"
+
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+var (
+ headBuffer = make([]byte, 16)
+ bodyBuffer = make([]byte, 4096)
+
+ requestMethods = []string{
+ "GET", "POST", "OPTION", "HEAD", "PUT", "DELETE", "CONNECT", "TRACE", "PATCH",
+ }
+)
+
+var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1", "reader")
+
+type MessageType int
+
+const (
+ MessageTypeRequest MessageType = iota
+ MessageTypeResponse
+ MessageTypeUnknown
+)
+
+func IdentityMessageType(reader *protocol.Buffer) (MessageType, error) {
+ n, err := reader.Peek(headBuffer)
+ if err != nil {
+ return MessageTypeUnknown, err
+ } else if n != len(headBuffer) {
+ return MessageTypeUnknown, fmt.Errorf("need more content for header")
+ }
+
+ headerString := string(headBuffer)
+ isRequest := false
+ for _, method := range requestMethods {
+ if strings.HasPrefix(headerString, method) {
+ isRequest = true
+ break
+ }
+ }
+ if isRequest {
+ return MessageTypeRequest, nil
+ }
+
+ if strings.HasPrefix(headerString, "HTTP") {
+ return MessageTypeResponse, nil
+ }
+ return MessageTypeUnknown, nil
+}
+
+type Message interface {
+ Headers() http.Header
+ HeaderBuffer() *protocol.Buffer
+ BodyBuffer() *protocol.Buffer
+}
+
+type MessageOpt struct {
+ Message
+}
+
+func (m *MessageOpt) ContentTotalSize() int {
+ return m.HeaderBuffer().Len() + m.BodyBuffer().Len()
+}
+
+func (m *MessageOpt) StartTime() uint64 {
+ return m.HeaderBuffer().FirstSocketBuffer().StartTime()
+}
+
+func (m *MessageOpt) EndTime() uint64 {
+ return m.HeaderBuffer().LastSocketBuffer().EndTime()
+}
+
+func (m *MessageOpt) Direction() base.SocketDataDirection {
+ return m.HeaderBuffer().FirstSocketBuffer().Direction()
+}
+
+// nolint
+func (m *MessageOpt) TransformReadableContent(defaultEncoding string, maxSize int) (string, error) {
+ contentType := m.Headers().Get("Content-Type")
+ if contentType == "" {
+ contentType = fmt.Sprintf("text/html; charset=%s", defaultEncoding)
+ }
+ isPlain := strings.HasPrefix(contentType, "text/") || contentType == "application/json"
+
+ // header to string
+ headerBuf, err := io.ReadAll(m.HeaderBuffer())
+ if err != nil {
+ return "", err
+ }
+ if maxSize > 0 && len(headerBuf) >= maxSize {
+ return string(headerBuf[:maxSize]), nil
+ }
+ headerString := string(headerBuf)
+ if !isPlain {
+ return fmt.Sprintf("%s[not plain, current content type: %s]", headerString, contentType), nil
+ }
+
+ // body to string
+ bodyLength := m.BodyBuffer().Len()
+ if bodyLength == 0 {
+ return headerString, nil
+ }
+ bodyReader, err := m.buildBodyReader(defaultEncoding)
+ if err != nil {
+ return "", err
+ }
+
+ bodyData, err := io.ReadAll(bodyReader)
+ if err != nil && err != io.ErrUnexpectedEOF {
+ return "", err
+ }
+ resultSize := len(bodyData)
+ if maxSize > 0 && (resultSize+len(headerString)) > maxSize {
+ resultSize = maxSize - len(headerString)
+ }
+ return fmt.Sprintf("%s%s", headerString, string(bodyData[0:resultSize])), nil
+}
+
+func (m *MessageOpt) buildBodyReader(contentType string) (io.Reader, error) {
+ var needGzip = m.Headers().Get("Content-Encoding") == "gzip"
+ var isUtf8 = true
+ if _, params, err := mime.ParseMediaType(contentType); err == nil {
+ if cs, ok := params["charset"]; ok {
+ isUtf8 = strings.EqualFold(cs, "utf-8")
+ }
+ }
+
+ var data io.Reader = m.BodyBuffer()
+ var err error
+ if needGzip {
+ data, err = gzip.NewReader(m.BodyBuffer())
+ if err != nil {
+ return nil, err
+ }
+ }
+ if !isUtf8 {
+ data, err = newCharsetReader(data, contentType)
+ if err != nil {
+ return nil, err
+ }
+ }
+ return data, nil
+}
+
+func (m *MessageOpt) appointedLength() (int, error) {
+ contentLengthStr := m.Headers().Get("Content-Length")
+ if contentLengthStr == "" {
+ return -1, nil
+ }
+ contentLength, err := strconv.ParseInt(contentLengthStr, 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("the request has not correct content length header value: %s", contentLengthStr)
+ }
+ return int(contentLength), nil
+}
+
+func (m *MessageOpt) isChunked() bool {
+ return m.Headers().Get("Transfer-Encoding") == "chunked"
+}
+
+func (m *MessageOpt) readBodyUntilCurrentPackageFinished(buf *protocol.Buffer, reader *bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+ startPosition := buf.OffsetPosition(-reader.Buffered())
+ for !buf.IsCurrentPacketReadFinished() {
+ _, err := buf.Read(bodyBuffer)
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ }
+ endPosition := buf.Position()
+ return buf.Slice(true, startPosition, endPosition), protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkChunkedBody(buf *protocol.Buffer, bodyReader *bufio.Reader) (*protocol.Buffer, protocol.ParseResult, error) {
+ buffers := make([]*protocol.Buffer, 0)
+ for {
+ line, _, err := bodyReader.ReadLine()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ needBytesStr := string(line)
+ needBytes, err := strconv.ParseInt(needBytesStr, 16, 64)
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read chunked size error: %s", needBytesStr)
+ }
+ if needBytes == 0 {
+ break
+ }
+ if b, r, err1 := m.checkBodyWithSize(buf, bodyReader, int(needBytes), false); err1 != nil {
+ return nil, protocol.ParseResultSkipPackage, err1
+ } else if r != protocol.ParseResultSuccess {
+ return nil, r, nil
+ } else {
+ if pos := b.DetectNotSendingLastPosition(); pos != nil {
+ log.Debugf("found the socket data not sending finished in BPF, so update the body to the latest data, %v", pos)
+ successSlice := b.Slice(true, b.Position(), pos)
+ buffers = append(buffers, successSlice)
+ break
+ }
+ buffers = append(buffers, b)
+ }
+ d, _, err := bodyReader.ReadLine()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ if len(d) != 0 {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the chunk data parding error, should be empty: %s", d)
+ }
+ }
+ return protocol.CombineSlices(true, buffers...), protocol.ParseResultSuccess, nil
+}
+
+func (m *MessageOpt) checkBodyWithSize(buf *protocol.Buffer, reader *bufio.Reader, size int,
+ detectedNotSending bool) (*protocol.Buffer, protocol.ParseResult, error) {
+ reduceSize := size
+ var readSize, lastReadSize int
+ var err error
+ startPosition := buf.OffsetPosition(-reader.Buffered())
+ for reduceSize > 0 {
+ readSize = reduceSize
+ if readSize > len(bodyBuffer) {
+ readSize = len(bodyBuffer)
+ }
+ lastReadSize, err = reader.Read(bodyBuffer[0:readSize])
+ if err != nil {
+ if err == protocol.ErrNotComplete {
+ return nil, protocol.ParseResultSkipPackage, nil
+ }
+ if err == io.EOF && reduceSize-lastReadSize <= 0 {
+ return nil, protocol.ParseResultSuccess, nil
+ }
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ reduceSize -= lastReadSize
+ }
+ endPosition := buf.OffsetPosition(-reader.Buffered())
+ slice := buf.Slice(true, startPosition, endPosition)
+ if detectedNotSending {
+ if pos := slice.DetectNotSendingLastPosition(); pos != nil {
+ log.Debugf("found the socket data not sending finished in BPF, so update the body to the latest data, %v", pos)
+ endPosition = pos
+ slice = buf.Slice(true, startPosition, endPosition)
+ }
+ }
+
+ return slice, protocol.ParseResultSuccess, nil
+}
+
+type charsetReadWrapper struct {
+ reader io.Reader
+}
+
+func newCharsetReader(r io.Reader, contentType string) (*charsetReadWrapper, error) {
+ reader, err := charset.NewReader(r, contentType)
+ if err != nil {
+ return nil, err
+ }
+ return &charsetReadWrapper{reader: reader}, nil
+}
+
+func (c *charsetReadWrapper) Read(p []byte) (n int, err error) {
+ return c.reader.Read(p)
+}
+
+func (c *charsetReadWrapper) Close() error {
+ return nil
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
new file mode 100644
index 0000000..1e2445e
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/request.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "net/textproto"
+ "net/url"
+ "strings"
+
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Request struct {
+ *MessageOpt
+ original *http.Request
+ headerBuffer *protocol.Buffer
+ bodyBuffer *protocol.Buffer
+}
+
+func (r *Request) Headers() http.Header {
+ return r.original.Header
+}
+
+func (r *Request) HeaderBuffer() *protocol.Buffer {
+ return r.headerBuffer
+}
+
+func (r *Request) BodyBuffer() *protocol.Buffer {
+ return r.bodyBuffer
+}
+
+func (r *Request) MinDataID() int {
+ return int(r.headerBuffer.FirstSocketBuffer().DataID())
+}
+
+func (r *Request) RequestURI() string {
+ return r.original.RequestURI
+}
+
+func ReadRequest(buf *protocol.Buffer) (*Request, protocol.ParseResult, error) {
+ bufReader := bufio.NewReader(buf)
+ tp := textproto.NewReader(bufReader)
+ req := &http.Request{}
+ result := &Request{original: req}
+ result.MessageOpt = &MessageOpt{result}
+
+ headerStartPosition := buf.Position()
+ line, err := tp.ReadLine()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read request first lint failure: %v", err)
+ }
+ method, rest, ok1 := strings.Cut(line, " ")
+ requestURI, proto, ok2 := strings.Cut(rest, " ")
+ if !ok1 || !ok2 {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the first line is not request: %s", line)
+ }
+
+ isRequest := false
+ for _, m := range requestMethods {
+ if method == m {
+ isRequest = true
+ break
+ }
+ }
+ if !isRequest {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("is not request: %s", method)
+ }
+ major, minor, ok := http.ParseHTTPVersion(proto)
+ if !ok {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("the protocol version cannot be identity: %s", proto)
+ }
+ justAuthority := req.Method == "CONNECT" && !strings.HasPrefix(requestURI, "/")
+ if justAuthority {
+ requestURI = "http://" + requestURI
+ }
+ uri, err := url.ParseRequestURI(requestURI)
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ req.Method, req.URL, req.RequestURI = method, uri, requestURI
+ req.Proto, req.ProtoMajor, req.ProtoMinor = proto, major, minor
+
+ // header reader
+ mimeHeader, err := tp.ReadMIMEHeader()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ }
+ req.Header = http.Header(mimeHeader)
+
+ req.Host = req.URL.Host
+ if req.Host == "" {
+ req.Host = req.Header.Get("Host")
+ }
+
+ result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+ if b, r, err := result.readFullBody(bufReader, buf); err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ } else if r != protocol.ParseResultSuccess {
+ return nil, r, nil
+ } else {
+ result.bodyBuffer = b
+ }
+
+ return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Request) buildHeaderBuffer(start *protocol.BufferPosition, buf *protocol.Buffer, bufReader *bufio.Reader) {
+ endPosition := buf.OffsetPosition(-bufReader.Buffered())
+ r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Request) readFullBody(bodyReader *bufio.Reader, original *protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+ length, err := r.appointedLength()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ } else if length > 0 {
+ return r.checkBodyWithSize(original, bodyReader, length, true)
+ }
+
+ if r.isChunked() {
+ return r.checkChunkedBody(original, bodyReader)
+ }
+
+ return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
new file mode 100644
index 0000000..e1b04ad
--- /dev/null
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/response.go
@@ -0,0 +1,125 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package reader
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "net/textproto"
+ "strconv"
+ "strings"
+
+ protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+)
+
+type Response struct {
+ *MessageOpt
+ req *Request
+ original *http.Response
+ headerBuffer *protocol.Buffer
+ bodyBuffer *protocol.Buffer
+}
+
+func (r *Response) Headers() http.Header {
+ return r.original.Header
+}
+
+func (r *Response) HeaderBuffer() *protocol.Buffer {
+ return r.headerBuffer
+}
+
+func (r *Response) BodyBuffer() *protocol.Buffer {
+ return r.bodyBuffer
+}
+
+func (r *Response) StatusCode() int {
+ return r.original.StatusCode
+}
+
+func ReadResponse(req *Request, buf *protocol.Buffer) (*Response, protocol.ParseResult, error) {
+ bufReader := bufio.NewReader(buf)
+ tp := textproto.NewReader(bufReader)
+ resp := &http.Response{}
+ result := &Response{original: resp, req: req}
+ result.MessageOpt = &MessageOpt{result}
+
+ headerStartPosition := buf.Position()
+ line, err := tp.ReadLine()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("read response first line failure: %v", err)
+ }
+ indexByte := strings.IndexByte(line, ' ')
+ if indexByte == -1 {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing response error: %s", line)
+ }
+ resp.Proto = line[:indexByte]
+ resp.Status = strings.TrimLeft(line[indexByte+1:], " ")
+ statusCode := resp.Status
+ if i := strings.IndexByte(resp.Status, ' '); i != -1 {
+ statusCode = resp.Status[:i]
+ }
+ if len(statusCode) != 3 {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing response status code failure: %v", statusCode)
+ }
+ resp.StatusCode, err = strconv.Atoi(statusCode)
+ if err != nil || resp.StatusCode < 0 {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("status code not correct: %s", statusCode)
+ }
+ var ok bool
+ if resp.ProtoMajor, resp.ProtoMinor, ok = http.ParseHTTPVersion(resp.Proto); !ok {
+ return nil, protocol.ParseResultSkipPackage, fmt.Errorf("parsing http version failure: %s", resp.Proto)
+ }
+
+ mimeHeader, err := tp.ReadMIMEHeader()
+ if err != nil {
+ result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+ return result, protocol.ParseResultSuccess, nil
+ }
+ resp.Header = http.Header(mimeHeader)
+
+ result.buildHeaderBuffer(headerStartPosition, buf, bufReader)
+ if b, r, err := result.readFullResponseBody(bufReader, buf); err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ } else if r != protocol.ParseResultSuccess {
+ return nil, r, nil
+ } else {
+ result.bodyBuffer = b
+ }
+ return result, protocol.ParseResultSuccess, nil
+}
+
+func (r *Response) buildHeaderBuffer(start *protocol.BufferPosition, buf *protocol.Buffer, bufReader *bufio.Reader) {
+ endPosition := buf.OffsetPosition(-bufReader.Buffered())
+ r.headerBuffer = buf.Slice(true, start, endPosition)
+}
+
+func (r *Response) readFullResponseBody(bodyReader *bufio.Reader, original *protocol.Buffer) (*protocol.Buffer, protocol.ParseResult, error) {
+ length, err := r.appointedLength()
+ if err != nil {
+ return nil, protocol.ParseResultSkipPackage, err
+ } else if length > 0 {
+ return r.checkBodyWithSize(original, bodyReader, length, true)
+ }
+
+ if r.isChunked() {
+ return r.checkChunkedBody(original, bodyReader)
+ }
+
+ return r.readBodyUntilCurrentPackageFinished(original, bodyReader)
+}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
index 75f602a..6a98c12 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go
@@ -19,7 +19,6 @@ package http1
import (
"fmt"
- "net/http"
"regexp"
"strings"
"time"
@@ -30,6 +29,7 @@ import (
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
+ "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
)
@@ -52,13 +52,12 @@ func NewSampler() *Sampler {
}
}
-func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
- request *http.Request, response *http.Response, reqBuffer, respBuffer protocol.SocketDataBuffer) {
+func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration, request *reader.Request, response *reader.Response) {
if config == nil {
return
}
tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string {
- return request.Header.Get(key)
+ return request.Headers().Get(key)
})
if err != nil {
log.Warnf("analyze tracing context error: %v", err)
@@ -68,7 +67,7 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
return
}
- uri := request.RequestURI
+ uri := request.RequestURI()
// remove the query parameters
if i := strings.Index(uri, "?"); i > 0 {
uri = uri[0:i]
@@ -82,10 +81,10 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
var traceType string
var topN *metrics.TopN
- if rule.When5XX && response.StatusCode >= 500 && response.StatusCode < 600 {
+ if rule.When5XX && response.StatusCode() >= 500 && response.StatusCode() < 600 {
traceType = "status_5xx"
topN = s.Error5xxTraces
- } else if rule.When4XX && response.StatusCode >= 400 && response.StatusCode < 500 {
+ } else if rule.When4XX && response.StatusCode() >= 400 && response.StatusCode() < 500 {
traceType = "status_4xx"
topN = s.Error4xxTraces
} else if rule.MinDuration != nil && int64(*rule.MinDuration) <= duration.Milliseconds() {
@@ -96,15 +95,13 @@ func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration,
}
trace := &Trace{
- Trace: tracingContext,
- RequestURI: uri,
- RequestBuffer: reqBuffer,
- ResponseBuffer: respBuffer,
- Request: request,
- Response: response,
- Type: traceType,
- Settings: rule.Settings,
- TaskConfig: config.ProfilingSampling,
+ Trace: tracingContext,
+ RequestURI: uri,
+ Request: request,
+ Response: response,
+ Type: traceType,
+ Settings: rule.Settings,
+ TaskConfig: config.ProfilingSampling,
}
topN.AddRecord(trace, duration.Milliseconds())
}
diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
index 4028039..d3db212 100644
--- a/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
+++ b/pkg/profiling/task/network/analyze/layer7/protocols/protocols.go
@@ -23,6 +23,8 @@ import (
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
+
+ "golang.org/x/net/context"
)
var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols")
@@ -43,15 +45,16 @@ func init() {
type Analyzer struct {
ctx protocol.Context
- protocols []protocol.Protocol
+ protocols map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer
}
func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer {
- protocols := make([]protocol.Protocol, 0)
+ protocols := make(map[base.ConnectionProtocol]*protocol.ProtocolAnalyzer)
for _, r := range registerProtocols {
p := r()
p.Init(config)
- protocols = append(protocols, p)
+ analyzer := protocol.NewProtocolAnalyzer(ctx, p, config)
+ protocols[p.Protocol()] = analyzer
}
return &Analyzer{
ctx: ctx,
@@ -59,14 +62,20 @@ func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer {
}
}
-func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
+func (a *Analyzer) Start(ctx context.Context) {
for _, p := range a.protocols {
- if p.ReceiveData(a.ctx, event) {
- return
- }
+ p.Start(ctx)
+ }
+}
+
+func (a *Analyzer) ReceiveSocketDataEvent(event *protocol.SocketDataUploadEvent) {
+ analyzer := a.protocols[event.Protocol]
+ if analyzer == nil {
+ log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)",
+ event.GenerateConnectionID(), event.Protocol.String(), event.Protocol)
+ return
}
- log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d), type: %s",
- event.GenerateConnectionID(), event.Protocol.String(), event.Protocol, event.MsgType.String())
+ analyzer.ReceiveSocketData(a.ctx, event)
}
func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
@@ -76,19 +85,19 @@ func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
}
type ProtocolMetrics struct {
- data map[string]protocol.Metrics
+ data map[base.ConnectionProtocol]protocol.Metrics
}
func NewProtocolMetrics() *ProtocolMetrics {
- metrics := make(map[string]protocol.Metrics)
+ metrics := make(map[base.ConnectionProtocol]protocol.Metrics)
for _, p := range defaultInstances {
- metrics[p.Name()] = p.GenerateMetrics()
+ metrics[p.Protocol()] = p.GenerateMetrics()
}
return &ProtocolMetrics{data: metrics}
}
-func (m *ProtocolMetrics) GetProtocolMetrics(name string) protocol.Metrics {
- return m.data[name]
+func (m *ProtocolMetrics) GetProtocolMetrics(p base.ConnectionProtocol) protocol.Metrics {
+ return m.data[p]
}
func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
diff --git a/pkg/profiling/task/network/analyze/layer7/queue.go b/pkg/profiling/task/network/analyze/layer7/queue.go
index 352059d..4c6ffa9 100644
--- a/pkg/profiling/task/network/analyze/layer7/queue.go
+++ b/pkg/profiling/task/network/analyze/layer7/queue.go
@@ -28,6 +28,7 @@ import (
)
type PartitionContext interface {
+ Start(ctx context.Context)
Consume(data interface{})
}
@@ -72,8 +73,9 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader, emap *eb
}
for i := 0; i < len(e.partitions); i++ {
- go func(inx int) {
+ go func(ctx context.Context, inx int) {
p := e.partitions[inx]
+ p.ctx.Start(ctx)
for {
select {
// consume the data
@@ -84,7 +86,7 @@ func (e *EventQueue) start0(ctx context.Context, bpfLoader *bpf.Loader, emap *eb
return
}
}
- }(i)
+ }(ctx, i)
}
}
diff --git a/pkg/tools/host/time.go b/pkg/tools/host/time.go
index 7cc8a78..15e22bd 100644
--- a/pkg/tools/host/time.go
+++ b/pkg/tools/host/time.go
@@ -41,10 +41,14 @@ func init() {
}
func TimeToInstant(bpfTime uint64) *v3.Instant {
- timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
- result := timeCopy.Add(time.Duration(bpfTime))
+ result := Time(bpfTime)
return &v3.Instant{
Seconds: result.Unix(),
Nanos: int32(result.Nanosecond()),
}
}
+
+func Time(bpfTime uint64) time.Time {
+ timeCopy := time.Unix(BootTime.Unix(), int64(BootTime.Nanosecond()))
+ return timeCopy.Add(time.Duration(bpfTime))
+}