You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/03/02 05:46:13 UTC
[skywalking-rover] branch main updated: Adapt protocol update and fix some continuous profiling bugs (#80)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git
The following commit(s) were added to refs/heads/main by this push:
new fc8d074 Adapt protocol update and fix some continuous profiling bugs (#80)
fc8d074 is described below
commit fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Mar 2 13:46:08 2023 +0800
Adapt protocol update and fix some continuous profiling bugs (#80)
---
go.mod | 10 +-
go.sum | 19 +-
pkg/profiling/continuous/base/metrics.go | 4 +-
pkg/profiling/continuous/base/windows.go | 49 +++--
pkg/profiling/continuous/base/windows_test.go | 218 +++++++++++++++++++++
pkg/profiling/continuous/checker/common/causes.go | 24 +--
.../continuous/checker/common/http_checker.go | 12 +-
.../continuous/checker/common/process_checker.go | 10 +-
.../continuous/checker/common/system_checker.go | 10 +-
.../continuous/checker/network_error_rate.go | 2 +-
.../continuous/checker/network_response_time.go | 2 +-
pkg/profiling/continuous/checker/process_cpu.go | 2 +-
pkg/profiling/continuous/checker/process_thread.go | 2 +-
pkg/profiling/continuous/checker/system_load.go | 2 +-
pkg/profiling/continuous/checkers.go | 19 +-
15 files changed, 316 insertions(+), 69 deletions(-)
diff --git a/go.mod b/go.mod
index 8696dea..7dea1cc 100644
--- a/go.mod
+++ b/go.mod
@@ -18,14 +18,14 @@ require (
github.com/stretchr/testify v1.8.1
github.com/zekroTJA/timedmap v1.4.0
golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
- golang.org/x/net v0.0.0-20220722155237-a158d28d115b
- golang.org/x/sys v0.3.0
+ golang.org/x/net v0.6.0
+ golang.org/x/sys v0.5.0
google.golang.org/grpc v1.44.0
k8s.io/api v0.23.5
k8s.io/apimachinery v0.23.5
k8s.io/client-go v0.23.5
k8s.io/utils v0.0.0-20211116205334-6203023598ed
- skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c
+ skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b
)
require (
@@ -57,8 +57,8 @@ require (
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
- golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
- golang.org/x/text v0.3.8 // indirect
+ golang.org/x/term v0.5.0 // indirect
+ golang.org/x/text v0.7.0 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
diff --git a/go.sum b/go.sum
index 78b8e05..3a3798e 100644
--- a/go.sum
+++ b/go.sum
@@ -560,8 +560,9 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -667,13 +668,13 @@ golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
-golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -683,8 +684,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
-golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -963,5 +964,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c h1:UUcBWaN9cUdtYqdj9ssIcL/BuHD4jT17smPgfpZTcVg=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c/go.mod h1:BS5LRvsAMmZn8YIW9n0+8eiJhC9zVn663fDr5t+cL40=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b h1:VAWQr1mJk4P/a8VZ9UASY8H53wj0zdLHgYvhddyQcXw=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b/go.mod h1:WovoDv1GA+8VuvHPVJL7q/fL0KlYPBZq5rTMCFQRzJU=
diff --git a/pkg/profiling/continuous/base/metrics.go b/pkg/profiling/continuous/base/metrics.go
index 6b48015..55029b0 100644
--- a/pkg/profiling/continuous/base/metrics.go
+++ b/pkg/profiling/continuous/base/metrics.go
@@ -45,7 +45,9 @@ func (m *MetricsAppender) AppendProcessSingleValue(name string, p api.ProcessInt
for k, v := range labels {
transformLabels = append(transformLabels, &v3.Label{Name: k, Value: v})
}
- transformLabels = append(transformLabels, &v3.Label{Name: "process_name", Value: p.Entity().ProcessName})
+ transformLabels = append(transformLabels,
+ &v3.Label{Name: "process_name", Value: p.Entity().ProcessName},
+ &v3.Label{Name: "layer", Value: p.Entity().Layer})
metadata := serviceInstanceMetadata{
service: p.Entity().ServiceName,
instance: p.Entity().InstanceName,
diff --git a/pkg/profiling/continuous/base/windows.go b/pkg/profiling/continuous/base/windows.go
index c400a64..6b9ab49 100644
--- a/pkg/profiling/continuous/base/windows.go
+++ b/pkg/profiling/continuous/base/windows.go
@@ -61,8 +61,8 @@ type TimeWindows[V any, R any] struct {
windowLocker sync.RWMutex
windowGenerator func() WindowData[V, R]
- lastFlushedElement *list.Element
- lastWriteElement *list.Element
+ // mark the latest flush endTime
+ lastFlushTime *time.Time
}
func NewTimeWindows[V any, R any](items []*PolicyItem, generator func() WindowData[V, R]) *TimeWindows[V, R] {
@@ -143,7 +143,7 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
second = 0
}
- if second > t.data.Len() {
+ if second >= t.data.Len() {
// add the older data, ignore it
return
}
@@ -151,29 +151,47 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
t.appendDataToSlot(t.data.Len()-second-1, val)
}
-func (t *TimeWindows[D, R]) FlushLastWriteData() (R, bool) {
- if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMostRecentData() (R, bool) {
+ endTime := t.endTime
+ if !t.shouldFlush(endTime) {
var empty R
return empty, false
}
- t.lastFlushedElement = t.lastWriteElement
- return t.lastFlushedElement.Value.(*windowDataWrapper[D, R]).Get(), true
+ t.lastFlushTime = endTime
+ return t.data.Back().Value.(*windowDataWrapper[D, R]).Get(), true
}
-func (t *TimeWindows[D, R]) FlushMultipleWriteData() ([]R, bool) {
- result := make([]R, 0)
- if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMultipleRecentData() ([]R, bool) {
+ endTime := t.endTime
+ if !t.shouldFlush(endTime) {
return nil, false
}
- for e := t.lastWriteElement; e != t.lastFlushedElement && e != nil; e = e.Prev() {
+ result := make([]R, 0)
+ slotCount := t.data.Len()
+ if t.lastFlushTime != nil {
+ slotCount = int(t.endTime.Sub(*t.lastFlushTime).Seconds()) - 1
+ }
+ for e := t.data.Back(); e != nil && slotCount >= 0; e = e.Prev() {
if e.Value.(*windowDataWrapper[D, R]).hasData {
result = append(result, e.Value.(*windowDataWrapper[D, R]).Get())
}
+ slotCount--
}
- t.lastFlushedElement = t.lastWriteElement
+ t.lastFlushTime = endTime
return result, true
}
+func (t *TimeWindows[D, R]) shouldFlush(endTime *time.Time) bool {
+ if endTime == nil {
+ return false
+ }
+ if t.lastFlushTime == nil {
+ return true
+ }
+
+ return t.lastFlushTime != endTime && t.lastFlushTime.Before(*endTime)
+}
+
func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
t.windowLocker.Lock()
defer t.windowLocker.Unlock()
@@ -190,9 +208,9 @@ func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
} else {
for i := 0; i < addSeconds; i++ {
// remove the older data
- first := t.data.Remove(t.data.Back()).(*windowDataWrapper[D, R])
+ first := t.data.Remove(t.data.Front()).(*windowDataWrapper[D, R])
first.Reset()
- t.data.PushFront(first)
+ t.data.PushBack(first)
}
}
t.endTime = &tm
@@ -202,7 +220,7 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) {
t.windowLocker.RLock()
defer t.windowLocker.RUnlock()
- if index <= 0 || index >= t.data.Len() {
+ if index < 0 || index > t.data.Len() {
return
}
@@ -223,7 +241,6 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) {
}
element.Value.(*windowDataWrapper[V, R]).Accept(data)
- t.lastWriteElement = element
}
type windowDataWrapper[D any, R any] struct {
diff --git a/pkg/profiling/continuous/base/windows_test.go b/pkg/profiling/continuous/base/windows_test.go
new file mode 100644
index 0000000..3a5c335
--- /dev/null
+++ b/pkg/profiling/continuous/base/windows_test.go
@@ -0,0 +1,218 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+ "reflect"
+ "testing"
+ "time"
+)
+
+var (
+ defaultTime = time.Now()
+)
+
+func TestAdd(t *testing.T) {
+ tests := []struct {
+ name string
+ size int
+ dataOp func(window *TimeWindows[float64, float64])
+ result []float64
+ }{
+ {
+ name: "normal",
+ size: 3,
+ dataOp: func(window *TimeWindows[float64, float64]) {
+ addData(window, 1, 1)
+ addData(window, 2, 2)
+ addData(window, 3, 3)
+ },
+ result: []float64{1, 2, 3},
+ },
+ {
+ name: "out of count",
+ size: 3,
+ dataOp: func(window *TimeWindows[float64, float64]) {
+ addData(window, 1, 1)
+ addData(window, 2, 2)
+ addData(window, 3, 3)
+ addData(window, 4, 4)
+ },
+ result: []float64{2, 3, 4},
+ },
+ {
+ name: "add older data",
+ size: 3,
+ dataOp: func(window *TimeWindows[float64, float64]) {
+ addData(window, 1, 1)
+ addData(window, 2, 2)
+ addData(window, 3, 3)
+ addData(window, 4, 4)
+ addData(window, 2, 4)
+ addData(window, 1, 8)
+ addData(window, 0, 10)
+ },
+ result: []float64{4, 3, 4},
+ },
+ {
+ name: "add new data which bigger than windows count",
+ size: 3,
+ dataOp: func(window *TimeWindows[float64, float64]) {
+ addData(window, 1, 1)
+ addData(window, 4, 2)
+ addData(window, 5, 3)
+ addData(window, 7, 5)
+ },
+ result: []float64{3, 0, 5},
+ },
+ }
+
+ for _, tt := range tests {
+ windows := timeWindows(tt.size)
+ tt.dataOp(windows)
+ actualValue := getAllValues(windows)
+ if !reflect.DeepEqual(tt.result, actualValue) {
+ t.Fatalf("test [%s] failure, expceted: %v, actual: %v", tt.name, tt.result, actualValue)
+ }
+ }
+}
+
+func TestFlushData(t *testing.T) {
+ tests := []struct {
+ name string
+ size int
+ operations []interface{}
+ }{
+ {
+ name: "normal[most recent]",
+ size: 3,
+ operations: []interface{}{
+ appendDataOperation{1, 1},
+ mostRecentDataChecker{1, true},
+ mostRecentDataChecker{0, false},
+
+ appendDataOperation{2, 2},
+ appendDataOperation{3, 3},
+ mostRecentDataChecker{3, true},
+ mostRecentDataChecker{0, false},
+ },
+ },
+ {
+ name: "has older data[most recent]",
+ size: 3,
+ operations: []interface{}{
+ appendDataOperation{1, 1},
+ appendDataOperation{2, 2},
+ appendDataOperation{1, 3},
+ mostRecentDataChecker{2, true},
+ mostRecentDataChecker{0, false},
+
+ appendDataOperation{3, 3},
+ appendDataOperation{4, 4},
+ appendDataOperation{1, 9},
+ mostRecentDataChecker{4, true},
+ mostRecentDataChecker{0, false},
+ },
+ },
+ {
+ name: "normal[multiple recent]",
+ size: 3,
+ operations: []interface{}{
+ appendDataOperation{1, 1},
+ appendDataOperation{2, 2},
+ appendDataOperation{3, 3},
+ multipleRecentDataChecker{[]float64{3, 2, 1}, true},
+ multipleRecentDataChecker{nil, false},
+
+ appendDataOperation{4, 4},
+ multipleRecentDataChecker{[]float64{4}, true},
+ multipleRecentDataChecker{nil, false},
+ },
+ },
+ {
+ name: "has older data[multiple recent]",
+ size: 3,
+ operations: []interface{}{
+ appendDataOperation{1, 1},
+ appendDataOperation{2, 2},
+ appendDataOperation{4, 4},
+ multipleRecentDataChecker{[]float64{4, 2}, true},
+ multipleRecentDataChecker{nil, false},
+
+ appendDataOperation{1, 1},
+ multipleRecentDataChecker{nil, false},
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ windows := timeWindows(tt.size)
+ for _, op := range tt.operations {
+ switch v := op.(type) {
+ case appendDataOperation:
+ addData(windows, v.second, v.value)
+ case mostRecentDataChecker:
+ val, hasData := windows.FlushMostRecentData()
+ if val != v.value || v.hasData != hasData {
+ t.Fatalf("test[%s] failure, excepted: %v-%t, actual: %v-%t", tt.name,
+ v.value, v.hasData, val, hasData)
+ }
+ case multipleRecentDataChecker:
+ val, hasData := windows.FlushMultipleRecentData()
+ if !reflect.DeepEqual(val, v.value) || v.hasData != hasData {
+ t.Fatalf("test[%s] failure, excepted: %v-%t, actual: %v-%t", tt.name,
+ v.value, v.hasData, val, hasData)
+ }
+ }
+ }
+ }
+}
+
+type appendDataOperation struct {
+ second int
+ value float64
+}
+
+type mostRecentDataChecker struct {
+ value float64
+ hasData bool
+}
+
+type multipleRecentDataChecker struct {
+ value []float64
+ hasData bool
+}
+
+func addData(win *TimeWindows[float64, float64], second int, val float64) {
+ result := defaultTime.Add(time.Second * time.Duration(second))
+ win.Add(result, val)
+}
+
+func timeWindows(count int) *TimeWindows[float64, float64] {
+ return NewTimeWindows([]*PolicyItem{{Period: count}}, func() WindowData[float64, float64] {
+ return NewLatestWindowData[float64]()
+ })
+}
+
+func getAllValues(win *TimeWindows[float64, float64]) []float64 {
+ result := make([]float64, 0)
+ for e := win.data.Front(); e != nil; e = e.Next() {
+ result = append(result, e.Value.(*windowDataWrapper[float64, float64]).Get())
+ }
+ return result
+}
diff --git a/pkg/profiling/continuous/checker/common/causes.go b/pkg/profiling/continuous/checker/common/causes.go
index 8955569..e9a1a70 100644
--- a/pkg/profiling/continuous/checker/common/causes.go
+++ b/pkg/profiling/continuous/checker/common/causes.go
@@ -27,18 +27,18 @@ import (
type SingleValueCause struct {
process api.ProcessInterface
policy *base.PolicyItem
- causeType v3.ContinuousProfilingCauseType
+ monitorType v3.ContinuousProfilingTriggeredMonitorType
threshold, current float64
}
-func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
+func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, monitorType v3.ContinuousProfilingTriggeredMonitorType,
threshold, current float64) *SingleValueCause {
return &SingleValueCause{
- process: p,
- policy: policyItem,
- causeType: causeType,
- threshold: threshold,
- current: current,
+ process: p,
+ policy: policyItem,
+ monitorType: monitorType,
+ threshold: threshold,
+ current: current,
}
}
@@ -52,7 +52,7 @@ func (p *SingleValueCause) FromPolicy() *base.PolicyItem {
func (p *SingleValueCause) GenerateTransferCause() *v3.ContinuousProfilingCause {
return &v3.ContinuousProfilingCause{
- Type: p.causeType,
+ Type: p.monitorType,
Cause: &v3.ContinuousProfilingCause_SingleValue{
SingleValue: &v3.ContinuousProfilingSingleValueCause{
Threshold: p.threshold,
@@ -68,16 +68,16 @@ type URICause struct {
process api.ProcessInterface
policy *base.PolicyItem
- causeType v3.ContinuousProfilingCauseType
+ causeType v3.ContinuousProfilingTriggeredMonitorType
threshold, current float64
}
-func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
- threshold, current float64) *URICause {
+func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem *base.PolicyItem,
+ monitorType v3.ContinuousProfilingTriggeredMonitorType, threshold, current float64) *URICause {
return &URICause{
process: p,
policy: policyItem,
- causeType: causeType,
+ causeType: monitorType,
IsRegex: isRegex,
URI: uri,
threshold: threshold,
diff --git a/pkg/profiling/continuous/checker/common/http_checker.go b/pkg/profiling/continuous/checker/common/http_checker.go
index 383f5ca..3952787 100644
--- a/pkg/profiling/continuous/checker/common/http_checker.go
+++ b/pkg/profiling/continuous/checker/common/http_checker.go
@@ -32,17 +32,17 @@ type HTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]] struct
*BaseChecker[*HTTPBasedCheckerProcessInfo]
CheckType base.CheckType
- CauseType v3.ContinuousProfilingCauseType
+ MonitorType v3.ContinuousProfilingTriggeredMonitorType
ThresholdGenerate func(val string) (float64, error)
}
func NewHTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]](checkType base.CheckType,
thresholdGenerator func(val string) (float64, error), dataGenerator func() base.WindowData[network.BufferEvent, float64],
- causeType v3.ContinuousProfilingCauseType) *HTTPBasedChecker[Data] {
+ monitorType v3.ContinuousProfilingTriggeredMonitorType) *HTTPBasedChecker[Data] {
checker := &HTTPBasedChecker[Data]{
CheckType: checkType,
ThresholdGenerate: thresholdGenerator,
- CauseType: causeType,
+ MonitorType: monitorType,
}
checker.BaseChecker = NewBaseChecker[*HTTPBasedCheckerProcessInfo](
func(p api.ProcessInterface, older *HTTPBasedCheckerProcessInfo, items []*base.PolicyItem) *HTTPBasedCheckerProcessInfo {
@@ -204,7 +204,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx base.CheckContext, metricsAppender *b
return val >= itemInfo.threshold
}); isMatch {
causes = append(causes, NewURICause(pidPolicies.Process, false, uri, item,
- n.CauseType, itemInfo.threshold, lastMatch))
+ n.MonitorType, itemInfo.threshold, lastMatch))
}
}
@@ -213,7 +213,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx base.CheckContext, metricsAppender *b
return val >= itemInfo.threshold
}); isMatch {
causes = append(causes, NewURICause(pidPolicies.Process, itemInfo.uriRegex != nil, globalURI, item,
- n.CauseType, itemInfo.threshold, lastMatch))
+ n.MonitorType, itemInfo.threshold, lastMatch))
}
}
}
@@ -225,7 +225,7 @@ func (n *HTTPBasedChecker[Data]) flushMetrics(uri string, windows *base.TimeWind
if uri == "" {
uri = "global"
}
- if data, hasUpdate := windows.FlushMultipleWriteData(); hasUpdate {
+ if data, hasUpdate := windows.FlushMultipleRecentData(); hasUpdate {
// flush each slot data
for _, d := range data {
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(n.CheckType)), process, map[string]string{
diff --git a/pkg/profiling/continuous/checker/common/process_checker.go b/pkg/profiling/continuous/checker/common/process_checker.go
index ad1f42e..8298506 100644
--- a/pkg/profiling/continuous/checker/common/process_checker.go
+++ b/pkg/profiling/continuous/checker/common/process_checker.go
@@ -34,16 +34,16 @@ type ProcessBasedChecker[V numbers] struct {
*BaseChecker[*ProcessBasedInfo[V]]
CheckType base.CheckType
- CauseType v3.ContinuousProfilingCauseType
+ MonitorType v3.ContinuousProfilingTriggeredMonitorType
ThresholdGenerate func(val string) (V, error)
DataGenerate func(process api.ProcessInterface) (V, error)
}
func NewProcessBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
- dataGenerator func(p api.ProcessInterface) (V, error), causeType v3.ContinuousProfilingCauseType) *ProcessBasedChecker[V] {
+ dataGenerator func(p api.ProcessInterface) (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *ProcessBasedChecker[V] {
checker := &ProcessBasedChecker[V]{
CheckType: checkType,
- CauseType: causeType,
+ MonitorType: monitorType,
ThresholdGenerate: thresholdGenerator,
DataGenerate: dataGenerator,
}
@@ -118,7 +118,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *b
causes := make([]base.ThresholdCause, 0)
for _, info := range r.PidWithInfos {
for _, threshold := range info.Policies {
- if data, hasData := info.Windows.FlushLastWriteData(); hasData {
+ if data, hasData := info.Windows.FlushMostRecentData(); hasData {
metricsAppender.AppendProcessSingleValue(strings.ToLower(string(r.CheckType)), info.Process, nil, float64(data))
}
if !ctx.ShouldCheck(info.Process, threshold.Policy) {
@@ -129,7 +129,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *b
return val >= threshold.Threshold
}); enable {
causes = append(causes,
- NewSingleValueCause(info.Process, threshold.Policy, r.CauseType, float64(threshold.Threshold), float64(lastMatch)))
+ NewSingleValueCause(info.Process, threshold.Policy, r.MonitorType, float64(threshold.Threshold), float64(lastMatch)))
}
}
}
diff --git a/pkg/profiling/continuous/checker/common/system_checker.go b/pkg/profiling/continuous/checker/common/system_checker.go
index 5cc4e60..1b9475f 100644
--- a/pkg/profiling/continuous/checker/common/system_checker.go
+++ b/pkg/profiling/continuous/checker/common/system_checker.go
@@ -30,7 +30,7 @@ import (
type SystemBasedChecker[V numbers] struct {
CheckType base.CheckType
- CauseType v3.ContinuousProfilingCauseType
+ MonitorType v3.ContinuousProfilingTriggeredMonitorType
ThresholdGenerate func(val string) (V, error)
DataGenerate func() (V, error)
GlobalWindows *base.TimeWindows[V, V]
@@ -39,10 +39,10 @@ type SystemBasedChecker[V numbers] struct {
}
func NewSystemBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
- dataGenerator func() (V, error), causeType v3.ContinuousProfilingCauseType) *SystemBasedChecker[V] {
+ dataGenerator func() (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *SystemBasedChecker[V] {
checker := &SystemBasedChecker[V]{
CheckType: checkType,
- CauseType: causeType,
+ MonitorType: monitorType,
ThresholdGenerate: thresholdGenerator,
DataGenerate: dataGenerator,
GlobalWindows: base.NewTimeWindows[V, V](nil, func() base.WindowData[V, V] {
@@ -100,7 +100,7 @@ func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *ba
}
causes := make([]base.ThresholdCause, 0)
- data, hasData := s.GlobalWindows.FlushLastWriteData()
+ data, hasData := s.GlobalWindows.FlushMostRecentData()
for _, policy := range s.Policies {
if hasData {
@@ -120,7 +120,7 @@ func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *ba
continue
}
- causes = append(causes, NewSingleValueCause(p, policy.Policy, s.CauseType, float64(policy.Threshold), float64(lastMatch)))
+ causes = append(causes, NewSingleValueCause(p, policy.Policy, s.MonitorType, float64(policy.Threshold), float64(lastMatch)))
}
}
diff --git a/pkg/profiling/continuous/checker/network_error_rate.go b/pkg/profiling/continuous/checker/network_error_rate.go
index 7ba08c9..0a43695 100644
--- a/pkg/profiling/continuous/checker/network_error_rate.go
+++ b/pkg/profiling/continuous/checker/network_error_rate.go
@@ -45,7 +45,7 @@ func (n *NetworkHTTPErrorRateChecker) Init(config *base.ContinuousConfig) error
return v, nil
}, func() base.WindowData[network.BufferEvent, float64] {
return &processNetworkResponseErrorStatics{}
- }, v3.ContinuousProfilingCauseType_HTTPErrorRate)
+ }, v3.ContinuousProfilingTriggeredMonitorType_HTTPErrorRate)
return nil
}
diff --git a/pkg/profiling/continuous/checker/network_response_time.go b/pkg/profiling/continuous/checker/network_response_time.go
index 7dd1754..a4ef34b 100644
--- a/pkg/profiling/continuous/checker/network_response_time.go
+++ b/pkg/profiling/continuous/checker/network_response_time.go
@@ -41,7 +41,7 @@ func (n *NetworkHTTPAvgResponseTimeChecker) Init(config *base.ContinuousConfig)
return strconv.ParseFloat(val, 64)
}, func() base.WindowData[network.BufferEvent, float64] {
return &processNetworkAvgResponseTimeStatics{}
- }, v3.ContinuousProfilingCauseType_HTTPAvgResponseTime)
+ }, v3.ContinuousProfilingTriggeredMonitorType_HTTPAvgResponseTime)
return nil
}
diff --git a/pkg/profiling/continuous/checker/process_cpu.go b/pkg/profiling/continuous/checker/process_cpu.go
index f0f226e..ef7de27 100644
--- a/pkg/profiling/continuous/checker/process_cpu.go
+++ b/pkg/profiling/continuous/checker/process_cpu.go
@@ -48,6 +48,6 @@ func (r *ProcessCPUChecker) Init(config *base.ContinuousConfig) error {
return 0, err
}
return percent * 100, nil
- }, v3.ContinuousProfilingCauseType_ProcessCPU)
+ }, v3.ContinuousProfilingTriggeredMonitorType_ProcessCPU)
return nil
}
diff --git a/pkg/profiling/continuous/checker/process_thread.go b/pkg/profiling/continuous/checker/process_thread.go
index 566abe2..2e0ab38 100644
--- a/pkg/profiling/continuous/checker/process_thread.go
+++ b/pkg/profiling/continuous/checker/process_thread.go
@@ -42,6 +42,6 @@ func (t *ProcessThreadCountChecker) Init(config *base.ContinuousConfig) error {
}, func(p api.ProcessInterface) (int32, error) {
threads, err := p.OriginalProcess().NumThreads()
return threads, err
- }, v3.ContinuousProfilingCauseType_ProcessThreadCount)
+ }, v3.ContinuousProfilingTriggeredMonitorType_ProcessThreadCount)
return nil
}
diff --git a/pkg/profiling/continuous/checker/system_load.go b/pkg/profiling/continuous/checker/system_load.go
index f77b949..689de5a 100644
--- a/pkg/profiling/continuous/checker/system_load.go
+++ b/pkg/profiling/continuous/checker/system_load.go
@@ -46,6 +46,6 @@ func (s *SystemLoadChecker) Init(config *base.ContinuousConfig) error {
return 0, err
}
return avg.Load1, nil
- }, v3.ContinuousProfilingCauseType_SystemLoad)
+ }, v3.ContinuousProfilingTriggeredMonitorType_SystemLoad)
return nil
}
diff --git a/pkg/profiling/continuous/checkers.go b/pkg/profiling/continuous/checkers.go
index 1e4cc05..45dd12f 100644
--- a/pkg/profiling/continuous/checkers.go
+++ b/pkg/profiling/continuous/checkers.go
@@ -286,15 +286,24 @@ func (c *Checkers) queryPolicyUpdates(servicePolicies map[string]string) (map[st
if err != nil {
return nil, err
}
+ // no update
+ if len(policyUpdateCommands.GetCommands()) == 0 {
+ return nil, nil
+ }
- if len(policyUpdateCommands.GetCommands()) != 1 ||
- policyUpdateCommands.GetCommands()[0].GetCommand() != "ContinuousProfilingPolicyQuery" ||
- len(policyUpdateCommands.GetCommands()[0].GetArgs()) != 1 ||
- policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetKey() != "ServiceWithPolicyJSON" {
+ var policyJSON string
+ if len(policyUpdateCommands.GetCommands()) == 1 && policyUpdateCommands.GetCommands()[0].GetCommand() == "ContinuousProfilingPolicyQuery" {
+ for _, arg := range policyUpdateCommands.GetCommands()[0].GetArgs() {
+ if arg.GetKey() == "ServiceWithPolicyJSON" {
+ policyJSON = arg.GetValue()
+ break
+ }
+ }
+ }
+ if policyJSON == "" {
return nil, fmt.Errorf("the query policy response not adapt")
}
- policyJSON := policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetValue()
updates := make([]*QueryPolicyUpdate, 0)
err = json.Unmarshal([]byte(policyJSON), &updates)
if err != nil {