You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2021/02/21 01:13:02 UTC

[skywalking-satellite] branch main updated: add timer fallbacker unit test (#30)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-satellite.git


The following commit(s) were added to refs/heads/main by this push:
     new 50b5cb7  add timer fallbacker unit test (#30)
50b5cb7 is described below

commit 50b5cb7932137aa1435c75ba1473bc569dc5d345
Author: Evan <31...@users.noreply.github.com>
AuthorDate: Sun Feb 21 09:12:56 2021 +0800

    add timer fallbacker unit test (#30)
    
    Co-authored-by: Evan <ev...@outlook.com>
---
 .../setup/plugins/fallbacker_timer-fallbacker.md   |  5 ++
 plugins/fallbacker/timer/timer_fallbacker.go       | 24 ++++--
 plugins/fallbacker/timer/timer_fallbacker_test.go  | 98 ++++++++++++++++++++++
 3 files changed, 120 insertions(+), 7 deletions(-)

diff --git a/docs/en/setup/plugins/fallbacker_timer-fallbacker.md b/docs/en/setup/plugins/fallbacker_timer-fallbacker.md
index 4329348..29f5ebd 100755
--- a/docs/en/setup/plugins/fallbacker_timer-fallbacker.md
+++ b/docs/en/setup/plugins/fallbacker_timer-fallbacker.md
@@ -3,6 +3,11 @@
 This is a timer fallback trigger to process the forward failure data.
 ## DefaultConfig
 ```yaml
+# The forwarder max retry times.
 max_times: 3
+# The latency_factor is the standard retry duration, and the time for each retry is expanded by 2 times until the number 
+# of retries reaches the maximum.(Time unit is millisecond.)
 latency_factor: 2000
+# The max retry latency time.(Time unit is millisecond.)
+max_latency_time: 5000
 ```
diff --git a/plugins/fallbacker/timer/timer_fallbacker.go b/plugins/fallbacker/timer/timer_fallbacker.go
index 49f4cf5..06e4a95 100644
--- a/plugins/fallbacker/timer/timer_fallbacker.go
+++ b/plugins/fallbacker/timer/timer_fallbacker.go
@@ -25,16 +25,18 @@ import (
 	"github.com/apache/skywalking-satellite/plugins/forwarder/api"
 )
 
-// Fallbacker is a timer fallbacker when forward fails. `latencyFactor` is the standard retry duration,
-// and the time for each retry is expanded by 2 times until the number of retries reaches the maximum.
+const Name = "timer-fallbacker"
+
+// Fallbacker is a timer fallbacker when forward fails.
 type Fallbacker struct {
 	config.CommonFields
-	maxTimes      int `mapstructure:"max_times"`
-	latencyFactor int `mapstructure:"latency_factor"`
+	MaxTimes       int `mapstructure:"max_times"`
+	LatencyFactor  int `mapstructure:"latency_factor"`
+	MaxLatencyTIme int `mapstructure:"max_latency_time"`
 }
 
 func (t *Fallbacker) Name() string {
-	return "timer-fallbacker"
+	return Name
 }
 
 func (t *Fallbacker) Description() string {
@@ -43,17 +45,25 @@ func (t *Fallbacker) Description() string {
 
 func (t *Fallbacker) DefaultConfig() string {
 	return `
+# The forwarder max retry times.
 max_times: 3
+# The latency_factor is the standard retry duration, and the time for each retry is expanded by 2 times until the number 
+# of retries reaches the maximum.(Time unit is millisecond.)
 latency_factor: 2000
+# The max retry latency time.(Time unit is millisecond.)
+max_latency_time: 5000
 `
 }
 
 func (t *Fallbacker) FallBack(batch event.BatchEvents, forward api.ForwardFunc) bool {
-	currentLatency := t.latencyFactor
-	for i := 1; i < t.maxTimes; i++ {
+	currentLatency := t.LatencyFactor
+	for i := 1; i < t.MaxTimes; i++ {
 		time.Sleep(time.Duration(currentLatency) * time.Millisecond)
 		if err := forward(batch); err != nil {
 			currentLatency *= 2
+			if currentLatency > t.MaxLatencyTIme {
+				currentLatency = t.MaxLatencyTIme
+			}
 		} else {
 			return true
 		}
diff --git a/plugins/fallbacker/timer/timer_fallbacker_test.go b/plugins/fallbacker/timer/timer_fallbacker_test.go
new file mode 100644
index 0000000..d294eb3
--- /dev/null
+++ b/plugins/fallbacker/timer/timer_fallbacker_test.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package timer
+
+import (
+	"errors"
+	"reflect"
+	"testing"
+
+	"github.com/apache/skywalking-satellite/internal/pkg/log"
+	"github.com/apache/skywalking-satellite/internal/pkg/plugin"
+	"github.com/apache/skywalking-satellite/internal/satellite/event"
+	_ "github.com/apache/skywalking-satellite/internal/satellite/test"
+	"github.com/apache/skywalking-satellite/plugins/fallbacker/api"
+)
+
+func initFallbacker(cfg plugin.Config) *Fallbacker {
+	plugin.RegisterPluginCategory(reflect.TypeOf((*api.Fallbacker)(nil)).Elem())
+	plugin.RegisterPlugin(new(Fallbacker))
+	cfg[plugin.NameField] = Name
+	q := api.GetFallbacker(cfg)
+	if q == nil {
+		log.Logger.Errorf("cannot get a default config fallbacker from the registry")
+		return nil
+	}
+	return q.(*Fallbacker)
+}
+
+func TestFallbacker_FallBack1(t1 *testing.T) {
+	count := 0
+	mockForwarderFunc := func(_ event.BatchEvents) error {
+		count++
+		if count < 4 {
+			return errors.New("mock error")
+		}
+		return nil
+	}
+	tests := []struct {
+		name      string
+		args      plugin.Config
+		want      bool
+		wantCount int
+	}{
+		{
+			name:      "default-fallbacker",
+			args:      plugin.Config{},
+			want:      false,
+			wantCount: 2,
+		},
+		{
+			name: "test-recach-max_times",
+			args: plugin.Config{
+				"max_times":        5,
+				"latency_factor":   200,
+				"max_latency_time": 3000,
+			},
+			want:      true,
+			wantCount: 4,
+		},
+		{
+			name: "test-unrecach-max_times",
+			args: plugin.Config{
+				"max_times":        10,
+				"latency_factor":   20,
+				"max_latency_time": 30000000,
+			},
+			want:      true,
+			wantCount: 4,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			f := initFallbacker(tt.args)
+			count = 0
+			if got := f.FallBack(make(event.BatchEvents, 0), mockForwarderFunc); got != tt.want {
+				t1.Errorf("FallBack() = %v, want %v", got, tt.want)
+			}
+			if count != tt.wantCount {
+				t1.Errorf("Fallback count = %v, want %v", count, tt.wantCount)
+			}
+		})
+	}
+}