You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2023/03/02 05:46:13 UTC

[skywalking-rover] branch main updated: Adapt protocol update and fix some continuous profiling bugs (#80)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-rover.git


The following commit(s) were added to refs/heads/main by this push:
     new fc8d074  Adapt protocol update and fix some continuous profiling bugs (#80)
fc8d074 is described below

commit fc8d074c6d34ecfee585a7097cbd5aef1ca680a5
Author: mrproliu <74...@qq.com>
AuthorDate: Thu Mar 2 13:46:08 2023 +0800

    Adapt protocol update and fix some continuous profiling bugs (#80)
---
 go.mod                                             |  10 +-
 go.sum                                             |  19 +-
 pkg/profiling/continuous/base/metrics.go           |   4 +-
 pkg/profiling/continuous/base/windows.go           |  49 +++--
 pkg/profiling/continuous/base/windows_test.go      | 218 +++++++++++++++++++++
 pkg/profiling/continuous/checker/common/causes.go  |  24 +--
 .../continuous/checker/common/http_checker.go      |  12 +-
 .../continuous/checker/common/process_checker.go   |  10 +-
 .../continuous/checker/common/system_checker.go    |  10 +-
 .../continuous/checker/network_error_rate.go       |   2 +-
 .../continuous/checker/network_response_time.go    |   2 +-
 pkg/profiling/continuous/checker/process_cpu.go    |   2 +-
 pkg/profiling/continuous/checker/process_thread.go |   2 +-
 pkg/profiling/continuous/checker/system_load.go    |   2 +-
 pkg/profiling/continuous/checkers.go               |  19 +-
 15 files changed, 316 insertions(+), 69 deletions(-)

diff --git a/go.mod b/go.mod
index 8696dea..7dea1cc 100644
--- a/go.mod
+++ b/go.mod
@@ -18,14 +18,14 @@ require (
 	github.com/stretchr/testify v1.8.1
 	github.com/zekroTJA/timedmap v1.4.0
 	golang.org/x/arch v0.0.0-20220722155209-00200b7164a7
-	golang.org/x/net v0.0.0-20220722155237-a158d28d115b
-	golang.org/x/sys v0.3.0
+	golang.org/x/net v0.6.0
+	golang.org/x/sys v0.5.0
 	google.golang.org/grpc v1.44.0
 	k8s.io/api v0.23.5
 	k8s.io/apimachinery v0.23.5
 	k8s.io/client-go v0.23.5
 	k8s.io/utils v0.0.0-20211116205334-6203023598ed
-	skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c
+	skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b
 )
 
 require (
@@ -57,8 +57,8 @@ require (
 	github.com/tklauser/numcpus v0.3.0 // indirect
 	github.com/yusufpapurcu/wmi v1.2.2 // indirect
 	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
-	golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
-	golang.org/x/text v0.3.8 // indirect
+	golang.org/x/term v0.5.0 // indirect
+	golang.org/x/text v0.7.0 // indirect
 	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
diff --git a/go.sum b/go.sum
index 78b8e05..3a3798e 100644
--- a/go.sum
+++ b/go.sum
@@ -560,8 +560,9 @@ golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy
 golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
 golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
+golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
+golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -667,13 +668,13 @@ golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
-golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
+golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
+golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
+golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -683,8 +684,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
-golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
+golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
+golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
@@ -963,5 +964,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLz
 sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
 sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
 sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c h1:UUcBWaN9cUdtYqdj9ssIcL/BuHD4jT17smPgfpZTcVg=
-skywalking.apache.org/repo/goapi v0.0.0-20230221054914-eeacbf544e9c/go.mod h1:BS5LRvsAMmZn8YIW9n0+8eiJhC9zVn663fDr5t+cL40=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b h1:VAWQr1mJk4P/a8VZ9UASY8H53wj0zdLHgYvhddyQcXw=
+skywalking.apache.org/repo/goapi v0.0.0-20230301143132-aa3f8469385b/go.mod h1:WovoDv1GA+8VuvHPVJL7q/fL0KlYPBZq5rTMCFQRzJU=
diff --git a/pkg/profiling/continuous/base/metrics.go b/pkg/profiling/continuous/base/metrics.go
index 6b48015..55029b0 100644
--- a/pkg/profiling/continuous/base/metrics.go
+++ b/pkg/profiling/continuous/base/metrics.go
@@ -45,7 +45,9 @@ func (m *MetricsAppender) AppendProcessSingleValue(name string, p api.ProcessInt
 	for k, v := range labels {
 		transformLabels = append(transformLabels, &v3.Label{Name: k, Value: v})
 	}
-	transformLabels = append(transformLabels, &v3.Label{Name: "process_name", Value: p.Entity().ProcessName})
+	transformLabels = append(transformLabels,
+		&v3.Label{Name: "process_name", Value: p.Entity().ProcessName},
+		&v3.Label{Name: "layer", Value: p.Entity().Layer})
 	metadata := serviceInstanceMetadata{
 		service:  p.Entity().ServiceName,
 		instance: p.Entity().InstanceName,
diff --git a/pkg/profiling/continuous/base/windows.go b/pkg/profiling/continuous/base/windows.go
index c400a64..6b9ab49 100644
--- a/pkg/profiling/continuous/base/windows.go
+++ b/pkg/profiling/continuous/base/windows.go
@@ -61,8 +61,8 @@ type TimeWindows[V any, R any] struct {
 	windowLocker    sync.RWMutex
 	windowGenerator func() WindowData[V, R]
 
-	lastFlushedElement *list.Element
-	lastWriteElement   *list.Element
+	// mark the latest flush endTime
+	lastFlushTime *time.Time
 }
 
 func NewTimeWindows[V any, R any](items []*PolicyItem, generator func() WindowData[V, R]) *TimeWindows[V, R] {
@@ -143,7 +143,7 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
 		second = 0
 	}
 
-	if second > t.data.Len() {
+	if second >= t.data.Len() {
 		// add the older data, ignore it
 		return
 	}
@@ -151,29 +151,47 @@ func (t *TimeWindows[D, R]) Add(tm time.Time, val D) {
 	t.appendDataToSlot(t.data.Len()-second-1, val)
 }
 
-func (t *TimeWindows[D, R]) FlushLastWriteData() (R, bool) {
-	if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMostRecentData() (R, bool) {
+	endTime := t.endTime
+	if !t.shouldFlush(endTime) {
 		var empty R
 		return empty, false
 	}
-	t.lastFlushedElement = t.lastWriteElement
-	return t.lastFlushedElement.Value.(*windowDataWrapper[D, R]).Get(), true
+	t.lastFlushTime = endTime
+	return t.data.Back().Value.(*windowDataWrapper[D, R]).Get(), true
 }
 
-func (t *TimeWindows[D, R]) FlushMultipleWriteData() ([]R, bool) {
-	result := make([]R, 0)
-	if t.lastWriteElement == nil || t.lastFlushedElement == t.lastWriteElement {
+func (t *TimeWindows[D, R]) FlushMultipleRecentData() ([]R, bool) {
+	endTime := t.endTime
+	if !t.shouldFlush(endTime) {
 		return nil, false
 	}
-	for e := t.lastWriteElement; e != t.lastFlushedElement && e != nil; e = e.Prev() {
+	result := make([]R, 0)
+	slotCount := t.data.Len()
+	if t.lastFlushTime != nil {
+		slotCount = int(t.endTime.Sub(*t.lastFlushTime).Seconds()) - 1
+	}
+	for e := t.data.Back(); e != nil && slotCount >= 0; e = e.Prev() {
 		if e.Value.(*windowDataWrapper[D, R]).hasData {
 			result = append(result, e.Value.(*windowDataWrapper[D, R]).Get())
 		}
+		slotCount--
 	}
-	t.lastFlushedElement = t.lastWriteElement
+	t.lastFlushTime = endTime
 	return result, true
 }
 
+func (t *TimeWindows[D, R]) shouldFlush(endTime *time.Time) bool {
+	if endTime == nil {
+		return false
+	}
+	if t.lastFlushTime == nil {
+		return true
+	}
+
+	return t.lastFlushTime != endTime && t.lastFlushTime.Before(*endTime)
+}
+
 func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
 	t.windowLocker.Lock()
 	defer t.windowLocker.Unlock()
@@ -190,9 +208,9 @@ func (t *TimeWindows[D, R]) moveTo(tm time.Time) {
 	} else {
 		for i := 0; i < addSeconds; i++ {
 			// remove the older data
-			first := t.data.Remove(t.data.Back()).(*windowDataWrapper[D, R])
+			first := t.data.Remove(t.data.Front()).(*windowDataWrapper[D, R])
 			first.Reset()
-			t.data.PushFront(first)
+			t.data.PushBack(first)
 		}
 	}
 	t.endTime = &tm
@@ -202,7 +220,7 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) {
 	t.windowLocker.RLock()
 	defer t.windowLocker.RUnlock()
 
-	if index <= 0 || index >= t.data.Len() {
+	if index < 0 || index > t.data.Len() {
 		return
 	}
 
@@ -223,7 +241,6 @@ func (t *TimeWindows[V, R]) appendDataToSlot(index int, data V) {
 	}
 
 	element.Value.(*windowDataWrapper[V, R]).Accept(data)
-	t.lastWriteElement = element
 }
 
 type windowDataWrapper[D any, R any] struct {
diff --git a/pkg/profiling/continuous/base/windows_test.go b/pkg/profiling/continuous/base/windows_test.go
new file mode 100644
index 0000000..3a5c335
--- /dev/null
+++ b/pkg/profiling/continuous/base/windows_test.go
@@ -0,0 +1,218 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package base
+
+import (
+	"reflect"
+	"testing"
+	"time"
+)
+
+var (
+	defaultTime = time.Now()
+)
+
+func TestAdd(t *testing.T) {
+	tests := []struct {
+		name   string
+		size   int
+		dataOp func(window *TimeWindows[float64, float64])
+		result []float64
+	}{
+		{
+			name: "normal",
+			size: 3,
+			dataOp: func(window *TimeWindows[float64, float64]) {
+				addData(window, 1, 1)
+				addData(window, 2, 2)
+				addData(window, 3, 3)
+			},
+			result: []float64{1, 2, 3},
+		},
+		{
+			name: "out of count",
+			size: 3,
+			dataOp: func(window *TimeWindows[float64, float64]) {
+				addData(window, 1, 1)
+				addData(window, 2, 2)
+				addData(window, 3, 3)
+				addData(window, 4, 4)
+			},
+			result: []float64{2, 3, 4},
+		},
+		{
+			name: "add older data",
+			size: 3,
+			dataOp: func(window *TimeWindows[float64, float64]) {
+				addData(window, 1, 1)
+				addData(window, 2, 2)
+				addData(window, 3, 3)
+				addData(window, 4, 4)
+				addData(window, 2, 4)
+				addData(window, 1, 8)
+				addData(window, 0, 10)
+			},
+			result: []float64{4, 3, 4},
+		},
+		{
+			name: "add new data which bigger than windows count",
+			size: 3,
+			dataOp: func(window *TimeWindows[float64, float64]) {
+				addData(window, 1, 1)
+				addData(window, 4, 2)
+				addData(window, 5, 3)
+				addData(window, 7, 5)
+			},
+			result: []float64{3, 0, 5},
+		},
+	}
+
+	for _, tt := range tests {
+		windows := timeWindows(tt.size)
+		tt.dataOp(windows)
+		actualValue := getAllValues(windows)
+		if !reflect.DeepEqual(tt.result, actualValue) {
+			t.Fatalf("test [%s] failure, expceted: %v, actual: %v", tt.name, tt.result, actualValue)
+		}
+	}
+}
+
+func TestFlushData(t *testing.T) {
+	tests := []struct {
+		name       string
+		size       int
+		operations []interface{}
+	}{
+		{
+			name: "normal[most recent]",
+			size: 3,
+			operations: []interface{}{
+				appendDataOperation{1, 1},
+				mostRecentDataChecker{1, true},
+				mostRecentDataChecker{0, false},
+
+				appendDataOperation{2, 2},
+				appendDataOperation{3, 3},
+				mostRecentDataChecker{3, true},
+				mostRecentDataChecker{0, false},
+			},
+		},
+		{
+			name: "has older data[most recent]",
+			size: 3,
+			operations: []interface{}{
+				appendDataOperation{1, 1},
+				appendDataOperation{2, 2},
+				appendDataOperation{1, 3},
+				mostRecentDataChecker{2, true},
+				mostRecentDataChecker{0, false},
+
+				appendDataOperation{3, 3},
+				appendDataOperation{4, 4},
+				appendDataOperation{1, 9},
+				mostRecentDataChecker{4, true},
+				mostRecentDataChecker{0, false},
+			},
+		},
+		{
+			name: "normal[multiple recent]",
+			size: 3,
+			operations: []interface{}{
+				appendDataOperation{1, 1},
+				appendDataOperation{2, 2},
+				appendDataOperation{3, 3},
+				multipleRecentDataChecker{[]float64{3, 2, 1}, true},
+				multipleRecentDataChecker{nil, false},
+
+				appendDataOperation{4, 4},
+				multipleRecentDataChecker{[]float64{4}, true},
+				multipleRecentDataChecker{nil, false},
+			},
+		},
+		{
+			name: "has older data[multiple recent]",
+			size: 3,
+			operations: []interface{}{
+				appendDataOperation{1, 1},
+				appendDataOperation{2, 2},
+				appendDataOperation{4, 4},
+				multipleRecentDataChecker{[]float64{4, 2}, true},
+				multipleRecentDataChecker{nil, false},
+
+				appendDataOperation{1, 1},
+				multipleRecentDataChecker{nil, false},
+			},
+		},
+	}
+
+	for _, tt := range tests {
+		windows := timeWindows(tt.size)
+		for _, op := range tt.operations {
+			switch v := op.(type) {
+			case appendDataOperation:
+				addData(windows, v.second, v.value)
+			case mostRecentDataChecker:
+				val, hasData := windows.FlushMostRecentData()
+				if val != v.value || v.hasData != hasData {
+					t.Fatalf("test[%s] failure, excepted: %v-%t, actual: %v-%t", tt.name,
+						v.value, v.hasData, val, hasData)
+				}
+			case multipleRecentDataChecker:
+				val, hasData := windows.FlushMultipleRecentData()
+				if !reflect.DeepEqual(val, v.value) || v.hasData != hasData {
+					t.Fatalf("test[%s] failure, excepted: %v-%t, actual: %v-%t", tt.name,
+						v.value, v.hasData, val, hasData)
+				}
+			}
+		}
+	}
+}
+
+type appendDataOperation struct {
+	second int
+	value  float64
+}
+
+type mostRecentDataChecker struct {
+	value   float64
+	hasData bool
+}
+
+type multipleRecentDataChecker struct {
+	value   []float64
+	hasData bool
+}
+
+func addData(win *TimeWindows[float64, float64], second int, val float64) {
+	result := defaultTime.Add(time.Second * time.Duration(second))
+	win.Add(result, val)
+}
+
+func timeWindows(count int) *TimeWindows[float64, float64] {
+	return NewTimeWindows([]*PolicyItem{{Period: count}}, func() WindowData[float64, float64] {
+		return NewLatestWindowData[float64]()
+	})
+}
+
+func getAllValues(win *TimeWindows[float64, float64]) []float64 {
+	result := make([]float64, 0)
+	for e := win.data.Front(); e != nil; e = e.Next() {
+		result = append(result, e.Value.(*windowDataWrapper[float64, float64]).Get())
+	}
+	return result
+}
diff --git a/pkg/profiling/continuous/checker/common/causes.go b/pkg/profiling/continuous/checker/common/causes.go
index 8955569..e9a1a70 100644
--- a/pkg/profiling/continuous/checker/common/causes.go
+++ b/pkg/profiling/continuous/checker/common/causes.go
@@ -27,18 +27,18 @@ import (
 type SingleValueCause struct {
 	process            api.ProcessInterface
 	policy             *base.PolicyItem
-	causeType          v3.ContinuousProfilingCauseType
+	monitorType        v3.ContinuousProfilingTriggeredMonitorType
 	threshold, current float64
 }
 
-func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
+func NewSingleValueCause(p api.ProcessInterface, policyItem *base.PolicyItem, monitorType v3.ContinuousProfilingTriggeredMonitorType,
 	threshold, current float64) *SingleValueCause {
 	return &SingleValueCause{
-		process:   p,
-		policy:    policyItem,
-		causeType: causeType,
-		threshold: threshold,
-		current:   current,
+		process:     p,
+		policy:      policyItem,
+		monitorType: monitorType,
+		threshold:   threshold,
+		current:     current,
 	}
 }
 
@@ -52,7 +52,7 @@ func (p *SingleValueCause) FromPolicy() *base.PolicyItem {
 
 func (p *SingleValueCause) GenerateTransferCause() *v3.ContinuousProfilingCause {
 	return &v3.ContinuousProfilingCause{
-		Type: p.causeType,
+		Type: p.monitorType,
 		Cause: &v3.ContinuousProfilingCause_SingleValue{
 			SingleValue: &v3.ContinuousProfilingSingleValueCause{
 				Threshold: p.threshold,
@@ -68,16 +68,16 @@ type URICause struct {
 
 	process            api.ProcessInterface
 	policy             *base.PolicyItem
-	causeType          v3.ContinuousProfilingCauseType
+	causeType          v3.ContinuousProfilingTriggeredMonitorType
 	threshold, current float64
 }
 
-func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem *base.PolicyItem, causeType v3.ContinuousProfilingCauseType,
-	threshold, current float64) *URICause {
+func NewURICause(p api.ProcessInterface, isRegex bool, uri string, policyItem *base.PolicyItem,
+	monitorType v3.ContinuousProfilingTriggeredMonitorType, threshold, current float64) *URICause {
 	return &URICause{
 		process:   p,
 		policy:    policyItem,
-		causeType: causeType,
+		causeType: monitorType,
 		IsRegex:   isRegex,
 		URI:       uri,
 		threshold: threshold,
diff --git a/pkg/profiling/continuous/checker/common/http_checker.go b/pkg/profiling/continuous/checker/common/http_checker.go
index 383f5ca..3952787 100644
--- a/pkg/profiling/continuous/checker/common/http_checker.go
+++ b/pkg/profiling/continuous/checker/common/http_checker.go
@@ -32,17 +32,17 @@ type HTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]] struct
 	*BaseChecker[*HTTPBasedCheckerProcessInfo]
 
 	CheckType         base.CheckType
-	CauseType         v3.ContinuousProfilingCauseType
+	MonitorType       v3.ContinuousProfilingTriggeredMonitorType
 	ThresholdGenerate func(val string) (float64, error)
 }
 
 func NewHTTPBasedChecker[Data base.WindowData[network.BufferEvent, float64]](checkType base.CheckType,
 	thresholdGenerator func(val string) (float64, error), dataGenerator func() base.WindowData[network.BufferEvent, float64],
-	causeType v3.ContinuousProfilingCauseType) *HTTPBasedChecker[Data] {
+	monitorType v3.ContinuousProfilingTriggeredMonitorType) *HTTPBasedChecker[Data] {
 	checker := &HTTPBasedChecker[Data]{
 		CheckType:         checkType,
 		ThresholdGenerate: thresholdGenerator,
-		CauseType:         causeType,
+		MonitorType:       monitorType,
 	}
 	checker.BaseChecker = NewBaseChecker[*HTTPBasedCheckerProcessInfo](
 		func(p api.ProcessInterface, older *HTTPBasedCheckerProcessInfo, items []*base.PolicyItem) *HTTPBasedCheckerProcessInfo {
@@ -204,7 +204,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx base.CheckContext, metricsAppender *b
 					return val >= itemInfo.threshold
 				}); isMatch {
 					causes = append(causes, NewURICause(pidPolicies.Process, false, uri, item,
-						n.CauseType, itemInfo.threshold, lastMatch))
+						n.MonitorType, itemInfo.threshold, lastMatch))
 				}
 			}
 
@@ -213,7 +213,7 @@ func (n *HTTPBasedChecker[Data]) Check(ctx base.CheckContext, metricsAppender *b
 				return val >= itemInfo.threshold
 			}); isMatch {
 				causes = append(causes, NewURICause(pidPolicies.Process, itemInfo.uriRegex != nil, globalURI, item,
-					n.CauseType, itemInfo.threshold, lastMatch))
+					n.MonitorType, itemInfo.threshold, lastMatch))
 			}
 		}
 	}
@@ -225,7 +225,7 @@ func (n *HTTPBasedChecker[Data]) flushMetrics(uri string, windows *base.TimeWind
 	if uri == "" {
 		uri = "global"
 	}
-	if data, hasUpdate := windows.FlushMultipleWriteData(); hasUpdate {
+	if data, hasUpdate := windows.FlushMultipleRecentData(); hasUpdate {
 		// flush each slot data
 		for _, d := range data {
 			metricsAppender.AppendProcessSingleValue(strings.ToLower(string(n.CheckType)), process, map[string]string{
diff --git a/pkg/profiling/continuous/checker/common/process_checker.go b/pkg/profiling/continuous/checker/common/process_checker.go
index ad1f42e..8298506 100644
--- a/pkg/profiling/continuous/checker/common/process_checker.go
+++ b/pkg/profiling/continuous/checker/common/process_checker.go
@@ -34,16 +34,16 @@ type ProcessBasedChecker[V numbers] struct {
 	*BaseChecker[*ProcessBasedInfo[V]]
 
 	CheckType         base.CheckType
-	CauseType         v3.ContinuousProfilingCauseType
+	MonitorType       v3.ContinuousProfilingTriggeredMonitorType
 	ThresholdGenerate func(val string) (V, error)
 	DataGenerate      func(process api.ProcessInterface) (V, error)
 }
 
 func NewProcessBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
-	dataGenerator func(p api.ProcessInterface) (V, error), causeType v3.ContinuousProfilingCauseType) *ProcessBasedChecker[V] {
+	dataGenerator func(p api.ProcessInterface) (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *ProcessBasedChecker[V] {
 	checker := &ProcessBasedChecker[V]{
 		CheckType:         checkType,
-		CauseType:         causeType,
+		MonitorType:       monitorType,
 		ThresholdGenerate: thresholdGenerator,
 		DataGenerate:      dataGenerator,
 	}
@@ -118,7 +118,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *b
 	causes := make([]base.ThresholdCause, 0)
 	for _, info := range r.PidWithInfos {
 		for _, threshold := range info.Policies {
-			if data, hasData := info.Windows.FlushLastWriteData(); hasData {
+			if data, hasData := info.Windows.FlushMostRecentData(); hasData {
 				metricsAppender.AppendProcessSingleValue(strings.ToLower(string(r.CheckType)), info.Process, nil, float64(data))
 			}
 			if !ctx.ShouldCheck(info.Process, threshold.Policy) {
@@ -129,7 +129,7 @@ func (r *ProcessBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *b
 				return val >= threshold.Threshold
 			}); enable {
 				causes = append(causes,
-					NewSingleValueCause(info.Process, threshold.Policy, r.CauseType, float64(threshold.Threshold), float64(lastMatch)))
+					NewSingleValueCause(info.Process, threshold.Policy, r.MonitorType, float64(threshold.Threshold), float64(lastMatch)))
 			}
 		}
 	}
diff --git a/pkg/profiling/continuous/checker/common/system_checker.go b/pkg/profiling/continuous/checker/common/system_checker.go
index 5cc4e60..1b9475f 100644
--- a/pkg/profiling/continuous/checker/common/system_checker.go
+++ b/pkg/profiling/continuous/checker/common/system_checker.go
@@ -30,7 +30,7 @@ import (
 
 type SystemBasedChecker[V numbers] struct {
 	CheckType         base.CheckType
-	CauseType         v3.ContinuousProfilingCauseType
+	MonitorType       v3.ContinuousProfilingTriggeredMonitorType
 	ThresholdGenerate func(val string) (V, error)
 	DataGenerate      func() (V, error)
 	GlobalWindows     *base.TimeWindows[V, V]
@@ -39,10 +39,10 @@ type SystemBasedChecker[V numbers] struct {
 }
 
 func NewSystemBasedChecker[V numbers](checkType base.CheckType, thresholdGenerator func(val string) (V, error),
-	dataGenerator func() (V, error), causeType v3.ContinuousProfilingCauseType) *SystemBasedChecker[V] {
+	dataGenerator func() (V, error), monitorType v3.ContinuousProfilingTriggeredMonitorType) *SystemBasedChecker[V] {
 	checker := &SystemBasedChecker[V]{
 		CheckType:         checkType,
-		CauseType:         causeType,
+		MonitorType:       monitorType,
 		ThresholdGenerate: thresholdGenerator,
 		DataGenerate:      dataGenerator,
 		GlobalWindows: base.NewTimeWindows[V, V](nil, func() base.WindowData[V, V] {
@@ -100,7 +100,7 @@ func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *ba
 	}
 
 	causes := make([]base.ThresholdCause, 0)
-	data, hasData := s.GlobalWindows.FlushLastWriteData()
+	data, hasData := s.GlobalWindows.FlushMostRecentData()
 
 	for _, policy := range s.Policies {
 		if hasData {
@@ -120,7 +120,7 @@ func (s *SystemBasedChecker[V]) Check(ctx base.CheckContext, metricsAppender *ba
 				continue
 			}
 
-			causes = append(causes, NewSingleValueCause(p, policy.Policy, s.CauseType, float64(policy.Threshold), float64(lastMatch)))
+			causes = append(causes, NewSingleValueCause(p, policy.Policy, s.MonitorType, float64(policy.Threshold), float64(lastMatch)))
 		}
 	}
 
diff --git a/pkg/profiling/continuous/checker/network_error_rate.go b/pkg/profiling/continuous/checker/network_error_rate.go
index 7ba08c9..0a43695 100644
--- a/pkg/profiling/continuous/checker/network_error_rate.go
+++ b/pkg/profiling/continuous/checker/network_error_rate.go
@@ -45,7 +45,7 @@ func (n *NetworkHTTPErrorRateChecker) Init(config *base.ContinuousConfig) error
 			return v, nil
 		}, func() base.WindowData[network.BufferEvent, float64] {
 			return &processNetworkResponseErrorStatics{}
-		}, v3.ContinuousProfilingCauseType_HTTPErrorRate)
+		}, v3.ContinuousProfilingTriggeredMonitorType_HTTPErrorRate)
 	return nil
 }
 
diff --git a/pkg/profiling/continuous/checker/network_response_time.go b/pkg/profiling/continuous/checker/network_response_time.go
index 7dd1754..a4ef34b 100644
--- a/pkg/profiling/continuous/checker/network_response_time.go
+++ b/pkg/profiling/continuous/checker/network_response_time.go
@@ -41,7 +41,7 @@ func (n *NetworkHTTPAvgResponseTimeChecker) Init(config *base.ContinuousConfig)
 			return strconv.ParseFloat(val, 64)
 		}, func() base.WindowData[network.BufferEvent, float64] {
 			return &processNetworkAvgResponseTimeStatics{}
-		}, v3.ContinuousProfilingCauseType_HTTPAvgResponseTime)
+		}, v3.ContinuousProfilingTriggeredMonitorType_HTTPAvgResponseTime)
 	return nil
 }
 
diff --git a/pkg/profiling/continuous/checker/process_cpu.go b/pkg/profiling/continuous/checker/process_cpu.go
index f0f226e..ef7de27 100644
--- a/pkg/profiling/continuous/checker/process_cpu.go
+++ b/pkg/profiling/continuous/checker/process_cpu.go
@@ -48,6 +48,6 @@ func (r *ProcessCPUChecker) Init(config *base.ContinuousConfig) error {
 			return 0, err
 		}
 		return percent * 100, nil
-	}, v3.ContinuousProfilingCauseType_ProcessCPU)
+	}, v3.ContinuousProfilingTriggeredMonitorType_ProcessCPU)
 	return nil
 }
diff --git a/pkg/profiling/continuous/checker/process_thread.go b/pkg/profiling/continuous/checker/process_thread.go
index 566abe2..2e0ab38 100644
--- a/pkg/profiling/continuous/checker/process_thread.go
+++ b/pkg/profiling/continuous/checker/process_thread.go
@@ -42,6 +42,6 @@ func (t *ProcessThreadCountChecker) Init(config *base.ContinuousConfig) error {
 	}, func(p api.ProcessInterface) (int32, error) {
 		threads, err := p.OriginalProcess().NumThreads()
 		return threads, err
-	}, v3.ContinuousProfilingCauseType_ProcessThreadCount)
+	}, v3.ContinuousProfilingTriggeredMonitorType_ProcessThreadCount)
 	return nil
 }
diff --git a/pkg/profiling/continuous/checker/system_load.go b/pkg/profiling/continuous/checker/system_load.go
index f77b949..689de5a 100644
--- a/pkg/profiling/continuous/checker/system_load.go
+++ b/pkg/profiling/continuous/checker/system_load.go
@@ -46,6 +46,6 @@ func (s *SystemLoadChecker) Init(config *base.ContinuousConfig) error {
 				return 0, err
 			}
 			return avg.Load1, nil
-		}, v3.ContinuousProfilingCauseType_SystemLoad)
+		}, v3.ContinuousProfilingTriggeredMonitorType_SystemLoad)
 	return nil
 }
diff --git a/pkg/profiling/continuous/checkers.go b/pkg/profiling/continuous/checkers.go
index 1e4cc05..45dd12f 100644
--- a/pkg/profiling/continuous/checkers.go
+++ b/pkg/profiling/continuous/checkers.go
@@ -286,15 +286,24 @@ func (c *Checkers) queryPolicyUpdates(servicePolicies map[string]string) (map[st
 	if err != nil {
 		return nil, err
 	}
+	// no update
+	if len(policyUpdateCommands.GetCommands()) == 0 {
+		return nil, nil
+	}
 
-	if len(policyUpdateCommands.GetCommands()) != 1 ||
-		policyUpdateCommands.GetCommands()[0].GetCommand() != "ContinuousProfilingPolicyQuery" ||
-		len(policyUpdateCommands.GetCommands()[0].GetArgs()) != 1 ||
-		policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetKey() != "ServiceWithPolicyJSON" {
+	var policyJSON string
+	if len(policyUpdateCommands.GetCommands()) == 1 && policyUpdateCommands.GetCommands()[0].GetCommand() == "ContinuousProfilingPolicyQuery" {
+		for _, arg := range policyUpdateCommands.GetCommands()[0].GetArgs() {
+			if arg.GetKey() == "ServiceWithPolicyJSON" {
+				policyJSON = arg.GetValue()
+				break
+			}
+		}
+	}
+	if policyJSON == "" {
 		return nil, fmt.Errorf("the query policy response not adapt")
 	}
 
-	policyJSON := policyUpdateCommands.GetCommands()[0].GetArgs()[0].GetValue()
 	updates := make([]*QueryPolicyUpdate, 0)
 	err = json.Unmarshal([]byte(policyJSON), &updates)
 	if err != nil {