You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ke...@apache.org on 2021/10/01 15:50:35 UTC
[skywalking-infra-e2e] branch main updated: Stop trigger when
cleaning up, add doc (#49)
This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git
The following commit(s) were added to refs/heads/main by this push:
new 689b7b9 Stop trigger when cleaning up, add doc (#49)
689b7b9 is described below
commit 689b7b9685dc07686a0858ae2d0f56173e100587
Author: kezhenxu94 <ke...@apache.org>
AuthorDate: Fri Oct 1 23:50:27 2021 +0800
Stop trigger when cleaning up, add doc (#49)
---
commands/run/run.go | 17 ++++--
commands/trigger/trigger.go | 48 +++++++---------
dist/LICENSE | 3 +-
docs/en/setup/Configuration-File.md | 4 ++
go.mod | 1 +
go.sum | 3 +-
internal/components/trigger/action.go | 18 +++++-
internal/components/trigger/http.go | 101 +++++++++++++++-------------------
third-party/go/template/exec.go | 4 +-
9 files changed, 104 insertions(+), 95 deletions(-)
diff --git a/commands/run/run.go b/commands/run/run.go
index 53280a9..09577d3 100644
--- a/commands/run/run.go
+++ b/commands/run/run.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/skywalking-infra-e2e/commands/setup"
"github.com/apache/skywalking-infra-e2e/commands/trigger"
"github.com/apache/skywalking-infra-e2e/commands/verify"
+ t "github.com/apache/skywalking-infra-e2e/internal/components/trigger"
"github.com/apache/skywalking-infra-e2e/internal/config"
"github.com/apache/skywalking-infra-e2e/internal/constant"
"github.com/apache/skywalking-infra-e2e/internal/logger"
@@ -47,10 +48,11 @@ func runAccordingE2E() error {
return config.GlobalConfig.Error
}
+ var action t.Action
// If cleanup.on == Always and there is error in setup step, we should defer cleanup step right now.
cleanupOnCondition := config.GlobalConfig.E2EConfig.Cleanup.On
if cleanupOnCondition == constant.CleanUpAlways {
- defer doCleanup()
+ defer doCleanup(action)
}
// setup part
@@ -70,12 +72,16 @@ func runAccordingE2E() error {
return
}
- doCleanup()
+ doCleanup(action)
}()
}
// trigger part
- err = trigger.DoActionAccordingE2E()
+ action, err = trigger.CreateTriggerAction()
+ if err != nil {
+ return err
+ }
+ err = <-action.Do()
if err != nil {
return err
}
@@ -91,7 +97,10 @@ func runAccordingE2E() error {
return nil
}
-func doCleanup() {
+func doCleanup(action t.Action) {
+ if action != nil {
+ action.Stop()
+ }
if err := cleanup.DoCleanupAccordingE2E(); err != nil {
logger.Log.Errorf("cleanup part error: %s", err)
} else {
diff --git a/commands/trigger/trigger.go b/commands/trigger/trigger.go
index 80410f4..8b546c0 100644
--- a/commands/trigger/trigger.go
+++ b/commands/trigger/trigger.go
@@ -29,42 +29,32 @@ import (
)
var Trigger = &cobra.Command{
- Use: "trigger",
- Short: "",
+ Use: "trigger",
RunE: func(cmd *cobra.Command, args []string) error {
- if err := DoActionAccordingE2E(); err != nil {
- return fmt.Errorf("[Trigger] %s", err)
+ action, err := CreateTriggerAction()
+ if err != nil {
+ return fmt.Errorf("[Trigger] %v", err)
}
-
- return nil
+ return <-action.Do()
},
}
-func DoActionAccordingE2E() error {
- if config.GlobalConfig.Error != nil {
- return config.GlobalConfig.Error
+func CreateTriggerAction() (trigger.Action, error) {
+ if err := config.GlobalConfig.Error; err != nil {
+ return nil, err
}
- e2eConfig := config.GlobalConfig.E2EConfig
- if e2eConfig.Trigger.Action == constant.ActionHTTP {
- action := trigger.NewHTTPAction(e2eConfig.Trigger.Interval,
- e2eConfig.Trigger.Times,
- e2eConfig.Trigger.URL,
- e2eConfig.Trigger.Method,
- e2eConfig.Trigger.Body,
- e2eConfig.Trigger.Headers,
+ switch t := config.GlobalConfig.E2EConfig.Trigger; t.Action {
+ case constant.ActionHTTP:
+ return trigger.NewHTTPAction(
+ t.Interval,
+ t.Times,
+ t.URL,
+ t.Method,
+ t.Body,
+ t.Headers,
)
- if action == nil {
- return fmt.Errorf("trigger [%+v] parse error", e2eConfig.Trigger)
- }
-
- err := action.Do()
- if err != nil {
- return err
- }
- } else {
- return fmt.Errorf("no such action for trigger: %s", e2eConfig.Trigger.Action)
+ default:
+ return nil, fmt.Errorf("unsupported trigger action: %s", t.Action)
}
-
- return nil
}
diff --git a/dist/LICENSE b/dist/LICENSE
index 4a0618b..ab94256 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -336,7 +336,6 @@ The text of each license is the standard Apache 2.0 license.
go-ini/ini v1.25.4 https://github.com/go-ini/ini Apache-2.0
go-jose v2.2.2 https://github.com/square/go-jose Apache-2.0
yaml/v2 v2.4.0 https://gopkg.in/yaml.v2 Apache-2.0
- yaml/v3 v3.0.0 https://gopkg.in/yaml.v3 MIT and Apache
gotest.tools v2.2.0 https://github.com/gotestyourself/gotest.tools Apache-2.0
k8s-api v0.20.7 https://k8s.io/api Apache-2.0
k8s-apimachinery v0.18.0 https://k8s.io/apimachinery Apache-2.0
@@ -637,4 +636,4 @@ Creative Commons Zero Universal Licenses
The following components are provided under the zlib license. See project link for details.
The text of each license is also included at licenses/LICENSE-[project].txt.
- goe v0.0.0 https://github.com/pascaldekloe/goe CCO-1.0-UNIVERSAL
\ No newline at end of file
+ goe v0.0.0 https://github.com/pascaldekloe/goe CCO-1.0-UNIVERSAL
diff --git a/docs/en/setup/Configuration-File.md b/docs/en/setup/Configuration-File.md
index c4655c7..2c5500f 100644
--- a/docs/en/setup/Configuration-File.md
+++ b/docs/en/setup/Configuration-File.md
@@ -92,6 +92,10 @@ trigger:
times: 5 # How many times to trigger the action, 0=infinite.
url: http://apache.skywalking.com/ # Http trigger url link.
method: GET # Http trigger method.
+ headers:
+ "Content-Type": "application/json"
+ "Authorization": "Basic whatever"
+ body: '{"k1":"v1", "k2":"v2"}'
```
The Trigger executed successfully at least once, after success, the next stage could be continued. Otherwise, there is an error and exit.
diff --git a/go.mod b/go.mod
index 004f535..76c7f0e 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/testcontainers/testcontainers-go v0.11.1
gopkg.in/yaml.v2 v2.4.0
+ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.20.7
k8s.io/apimachinery v0.20.7
k8s.io/cli-runtime v0.20.7
diff --git a/go.sum b/go.sum
index 205dcc3..a7be77a 100644
--- a/go.sum
+++ b/go.sum
@@ -997,8 +997,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
+gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
diff --git a/internal/components/trigger/action.go b/internal/components/trigger/action.go
index 125a966..a6530d7 100644
--- a/internal/components/trigger/action.go
+++ b/internal/components/trigger/action.go
@@ -19,5 +19,21 @@
package trigger
type Action interface {
- Do() error
+ // Do performs the trigger action according to the settings,
+ // and returns an error channel, the controller waits for the
+ // error and if it's nil, the controller considers the action
+ // is successfully scheduled, otherwise the controller considers
+ // this is a failure and aborts the process.
+ //
+ // It's guaranteed that the error channel will receive the first
+ // error, and receive at most 1 error, all following errors will
+ // not be returned.
+ //
+ // This returning style can be used to wait for the http action
+ // being normal, and then schedule it, otherwise we can interrupt
+ // according to the first error.
+ Do() chan error
+
+ // Stop stops the scheduled actions.
+ Stop()
}
diff --git a/internal/components/trigger/http.go b/internal/components/trigger/http.go
index 4a3ddce..4377f2f 100644
--- a/internal/components/trigger/http.go
+++ b/internal/components/trigger/http.go
@@ -18,9 +18,7 @@
package trigger
import (
- "context"
"fmt"
- "io/ioutil"
"net/http"
"os"
"strings"
@@ -37,18 +35,18 @@ type httpAction struct {
body string
headers map[string]string
executedCount int
+ stopCh chan struct{}
+ client *http.Client
}
-func NewHTTPAction(intervalStr string, times int, url, method, body string, headers map[string]string) Action {
+func NewHTTPAction(intervalStr string, times int, url, method, body string, headers map[string]string) (Action, error) {
interval, err := time.ParseDuration(intervalStr)
if err != nil {
- logger.Log.Errorf("interval [%s] parse error: %s.", intervalStr, err)
- return nil
+ return nil, err
}
if interval <= 0 {
- logger.Log.Errorf("interval [%s] is not positive", interval)
- return nil
+ return nil, fmt.Errorf("trigger interval should be > 0, but was %s", interval)
}
// there can be env variables in url, say, "http://${GATEWAY_HOST}:${GATEWAY_PORT}/test"
@@ -62,66 +60,65 @@ func NewHTTPAction(intervalStr string, times int, url, method, body string, head
body: body,
headers: headers,
executedCount: 0,
- }
+ stopCh: make(chan struct{}),
+ client: &http.Client{},
+ }, nil
}
-func (h *httpAction) Do() error {
- ctx := context.Background()
+func (h *httpAction) Do() chan error {
t := time.NewTicker(h.interval)
- h.executedCount = 0
- client := &http.Client{}
-
- r := strings.NewReader(h.body)
- rc := ioutil.NopCloser(r)
-
- request, err := http.NewRequest(h.method, h.url, rc)
- headers := http.Header{}
- for k, v := range h.headers {
- headers[k] = []string{v}
- }
- request.Header = headers
- if err != nil {
- logger.Log.Errorf("new request error %v", err)
- return err
- }
- logger.Log.Infof("Trigger will request URL %s %d times, %s seconds apart each time.", h.url, h.times, h.interval)
+ logger.Log.Infof("trigger will request URL %s %d times with interval %s.", h.url, h.times, h.interval)
- // execute until success
- for range t.C {
- err = h.executeOnce(client, request)
- if err == nil {
- break
- }
- if !h.couldContinue() {
- logger.Log.Errorf("do request %d times, but still failed", h.times)
- return err
- }
- }
-
- // background interval trigger
+ result := make(chan error)
+ sent := false
go func() {
for {
select {
case <-t.C:
- err = h.executeOnce(client, request)
- if !h.couldContinue() {
- return
+ err := h.execute()
+
+ // `h.times == h.executedCount` makes sure to only send the first error
+ if !sent && (err == nil || h.times == h.executedCount) {
+ result <- err
+ sent = true
}
- case <-ctx.Done():
+ case <-h.stopCh:
t.Stop()
+ result <- nil
return
}
}
}()
- return nil
+ return result
}
-// execute http request once time
-func (h *httpAction) executeOnce(client *http.Client, req *http.Request) error {
+func (h *httpAction) Stop() {
+ h.stopCh <- struct{}{}
+}
+
+func (h *httpAction) request() (*http.Request, error) {
+ request, err := http.NewRequest(h.method, h.url, strings.NewReader(h.body))
+ if err != nil {
+ return nil, err
+ }
+ headers := http.Header{}
+ for k, v := range h.headers {
+ headers[k] = []string{v}
+ }
+ request.Header = headers
+ return request, err
+}
+
+func (h *httpAction) execute() error {
+ req, err := h.request()
+ if err != nil {
+ logger.Log.Errorf("failed to create new request %v", err)
+ return err
+ }
logger.Log.Debugf("request URL %s the %d time.", h.url, h.executedCount)
- response, err := client.Do(req)
+ response, err := h.client.Do(req)
h.executedCount++
if err != nil {
logger.Log.Errorf("do request error %v", err)
@@ -136,11 +133,3 @@ func (h *httpAction) executeOnce(client *http.Client, req *http.Request) error {
}
return fmt.Errorf("do request failed, response status code: %d", response.StatusCode)
}
-
-// verify http action could continue
-func (h *httpAction) couldContinue() bool {
- if h.times > 0 && h.times <= h.executedCount {
- return false
- }
- return true
-}
diff --git a/third-party/go/template/exec.go b/third-party/go/template/exec.go
index 24ff03d..d6eeae8 100644
--- a/third-party/go/template/exec.go
+++ b/third-party/go/template/exec.go
@@ -428,7 +428,7 @@ func (s *state) walkContains(dot reflect.Value, r *parse.ContainsNode) {
// the contents inside `contains` must be an array
var re []interface{}
if err := yaml.Unmarshal(b.Bytes(), &re); err != nil {
- logger.Log.Errorf("failed to unmarshal index: %v", index)
+ logger.Log.Errorf("failed to unmarshal index: %v, %v", index, err)
}
return re
}
@@ -471,7 +471,7 @@ func (s *state) walkContains(dot reflect.Value, r *parse.ContainsNode) {
listTokenIndex := strings.Index(strings.TrimPrefix(r.List.Nodes[0].String(), "\n"), "-")
marshal = addRootIndent(marshal, listTokenIndex)
- s.wr.Write(append([]byte("\n"), marshal...))
+ _, _ = s.wr.Write(append([]byte("\n"), marshal...))
return
case reflect.Map:
if val.Len() == 0 {