You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by zh...@apache.org on 2021/03/13 08:33:26 UTC
[skywalking-infra-e2e] branch main updated: Implement the run and
wait part of setup (#13)
This is an automated email from the ASF dual-hosted git repository.
zhangke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git
The following commit(s) were added to refs/heads/main by this push:
new 25b6c78 Implement the run and wait part of setup (#13)
25b6c78 is described below
commit 25b6c78a540734480c2b0f8b5e4a22201646da68
Author: Humbert Zhang <50...@qq.com>
AuthorDate: Sat Mar 13 16:33:20 2021 +0800
Implement the run and wait part of setup (#13)
* Implement the run and wait part of setup
---
examples/simple/e2e.yaml | 16 ++++++
internal/components/setup/common.go | 108 ++++++++++++++++++++++++++++++++++++
internal/components/setup/kind.go | 106 ++++++++++++++++++++++-------------
internal/config/e2eConfig.go | 5 +-
internal/config/globalConfig.go | 2 +-
5 files changed, 194 insertions(+), 43 deletions(-)
diff --git a/examples/simple/e2e.yaml b/examples/simple/e2e.yaml
index 480099b..778718e 100644
--- a/examples/simple/e2e.yaml
+++ b/examples/simple/e2e.yaml
@@ -33,6 +33,22 @@ setup:
- namespace: default
resource: pod
for: condition=Ready
+ runs: # commands are serial within one code block and parallel between code blocks
+ - command: | # it can be a shell script or anything executable
+ cp $TMPDIR/e2e-k8s.config ~/.kube/config
+ kubectl create deployment nginx1 --image=nginx
+ wait:
+ - namespace: default
+ resource: deployment/nginx1
+ for: condition=Available
+ - command: |
+ cp $TMPDIR/e2e-k8s.config ~/.kube/config
+ kubectl create deployment nginx2 \
+ --image=nginx
+ wait:
+ - namespace: default
+ resource: deployment/nginx2
+ for: condition=Available
timeout: 600
verify:
diff --git a/internal/components/setup/common.go b/internal/components/setup/common.go
new file mode 100644
index 0000000..cae40c4
--- /dev/null
+++ b/internal/components/setup/common.go
@@ -0,0 +1,108 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package setup
+
+import (
+ "fmt"
+ "io/ioutil"
+ "time"
+
+ "github.com/apache/skywalking-infra-e2e/internal/config"
+ "github.com/apache/skywalking-infra-e2e/internal/logger"
+ "github.com/apache/skywalking-infra-e2e/internal/util"
+)
+
+// RunCommandsAndWait Concurrently run commands and wait for conditions.
+func RunCommandsAndWait(runs []config.Run, timeout time.Duration) error {
+ waitSet := util.NewWaitSet(timeout)
+
+ for idx := range runs {
+ run := runs[idx]
+ commands := run.Command
+ if len(commands) < 1 {
+ continue
+ }
+
+ waitSet.WaitGroup.Add(1)
+ go executeCommandsAndWait(commands, run.Waits, waitSet)
+ }
+
+ go func() {
+ waitSet.WaitGroup.Wait()
+ close(waitSet.FinishChan)
+ }()
+
+ select {
+ case <-waitSet.FinishChan:
+ logger.Log.Infof("all commands executed successfully")
+ case err := <-waitSet.ErrChan:
+ logger.Log.Errorf("execute command error")
+ return err
+ case <-time.After(waitSet.Timeout):
+ return fmt.Errorf("wait for commands run timeout after %d seconds", int(timeout.Seconds()))
+ }
+
+ return nil
+}
+
+func executeCommandsAndWait(commands string, waits []config.Wait, waitSet *util.WaitSet) {
+ defer waitSet.WaitGroup.Done()
+
+ // executes commands
+ logger.Log.Infof("executing commands [%s]", commands)
+ result, err := util.ExecuteCommand(commands)
+ if err != nil {
+ err = fmt.Errorf("commands: [%s] runs error: %s", commands, err)
+ waitSet.ErrChan <- err
+ }
+ logger.Log.Infof("executed commands [%s], result: %s", commands, result)
+
+ // waits for conditions meet
+ for idx := range waits {
+ wait := waits[idx]
+ logger.Log.Infof("waiting for %+v", wait)
+
+ kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
+ if err != nil {
+ err = fmt.Errorf("read kube config failed: %s", err)
+ waitSet.ErrChan <- err
+ }
+
+ options, err := getWaitOptions(kubeConfigYaml, &wait)
+ if err != nil {
+ err = fmt.Errorf("commands: [%s] get wait options error: %s", commands, err)
+ waitSet.ErrChan <- err
+ }
+
+ err = options.RunWait()
+ if err != nil {
+ err = fmt.Errorf("commands: [%s] waits error: %s", commands, err)
+ waitSet.ErrChan <- err
+ return
+ }
+ logger.Log.Infof("wait %+v condition met", wait)
+ }
+}
+
+// NewTimeout calculates new timeout since timeBefore.
+func NewTimeout(timeBefore time.Time, timeout time.Duration) time.Duration {
+ elapsed := time.Since(timeBefore)
+ newTimeout := timeout - elapsed
+ return newTimeout
+}
diff --git a/internal/components/setup/kind.go b/internal/components/setup/kind.go
index 2b6c100..c4c4f27 100644
--- a/internal/components/setup/kind.go
+++ b/internal/components/setup/kind.go
@@ -65,6 +65,16 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
return nil
}
+ if err := createKindCluster(kindConfigPath); err != nil {
+ return err
+ }
+
+ c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+ if err != nil {
+ logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
+ return err
+ }
+
timeout := e2eConfig.Setup.Timeout
var waitTimeout time.Duration
if timeout <= 0 {
@@ -75,19 +85,26 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
- if err := createKindCluster(kindConfigPath); err != nil {
- return err
- }
+ // record time now
+ timeNow := time.Now()
- c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+ err = createManifestsAndWait(c, dc, manifests, waitTimeout)
if err != nil {
- logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
return err
}
- err = createManifestsAndWait(c, dc, manifests, waitTimeout)
- if err != nil {
- return err
+ // calculates new timeout. manifests and run of setup uses the same countdown.
+ runWaitTimeout := NewTimeout(timeNow, waitTimeout)
+ if runWaitTimeout <= 0 {
+ return fmt.Errorf("kind setup timeout")
+ }
+
+ if len(e2eConfig.Setup.Runs) > 0 {
+ logger.Log.Debugf("executing commands...")
+ err := RunCommandsAndWait(e2eConfig.Setup.Runs, runWaitTimeout)
+ if err != nil {
+ return err
+ }
}
return nil
}
@@ -129,42 +146,17 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manif
continue
}
- for _, wait := range waits {
- if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
- return fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
- }
-
- restClientGetter := util.NewSimpleRESTClientGetter(wait.Namespace, string(kubeConfigYaml))
- silenceOutput, _ := os.Open(os.DevNull)
- ioStreams := genericclioptions.IOStreams{In: os.Stdin, Out: silenceOutput, ErrOut: os.Stderr}
- waitFlags := ctlwait.NewWaitFlags(restClientGetter, ioStreams)
- // global timeout is set in e2e.yaml
- waitFlags.Timeout = constant.SingleDefaultWaitTimeout
- waitFlags.ForCondition = wait.For
-
- var args []string
- // resource.group/resource.name OR resource.group
- if wait.Resource != "" {
- args = append(args, wait.Resource)
- } else {
- return fmt.Errorf("resource must be provided in wait block")
- }
-
- if wait.LabelSelector != "" {
- waitFlags.ResourceBuilderFlags.LabelSelector = &wait.LabelSelector
- } else if !strings.Contains(wait.Resource, "/") {
- // if labelSelector is nil and resource only provide resource.group, check all resources.
- waitFlags.ResourceBuilderFlags.All = &constant.True
- }
+ for idx := range waits {
+ wait := waits[idx]
+ logger.Log.Infof("waiting for %+v", wait)
- options, err := waitFlags.ToOptions(args)
+ options, err := getWaitOptions(kubeConfigYaml, &wait)
if err != nil {
return err
}
- logger.Log.Infof("waiting for %+v", wait)
waitSet.WaitGroup.Add(1)
- go concurrentlyWait(wait, options, waitSet)
+ go concurrentlyWait(&wait, options, waitSet)
}
}
@@ -186,6 +178,41 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manif
return nil
}
+func getWaitOptions(kubeConfigYaml []byte, wait *config.Wait) (options *ctlwait.WaitOptions, err error) {
+ if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
+ return nil, fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
+ }
+
+ restClientGetter := util.NewSimpleRESTClientGetter(wait.Namespace, string(kubeConfigYaml))
+ silenceOutput, _ := os.Open(os.DevNull)
+ ioStreams := genericclioptions.IOStreams{In: os.Stdin, Out: silenceOutput, ErrOut: os.Stderr}
+ waitFlags := ctlwait.NewWaitFlags(restClientGetter, ioStreams)
+ // global timeout is set in e2e.yaml
+ waitFlags.Timeout = constant.SingleDefaultWaitTimeout
+ waitFlags.ForCondition = wait.For
+
+ var args []string
+ // resource.group/resource.name OR resource.group
+ if wait.Resource != "" {
+ args = append(args, wait.Resource)
+ } else {
+ return nil, fmt.Errorf("resource must be provided in wait block")
+ }
+
+ if wait.LabelSelector != "" {
+ waitFlags.ResourceBuilderFlags.LabelSelector = &wait.LabelSelector
+ } else if !strings.Contains(wait.Resource, "/") {
+ // if labelSelector is nil and resource only provide resource.group, check all resources.
+ waitFlags.ResourceBuilderFlags.All = &constant.True
+ }
+
+ options, err = waitFlags.ToOptions(args)
+ if err != nil {
+ return nil, err
+ }
+ return options, nil
+}
+
func createByManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest) error {
files, err := util.GetManifests(manifest.GetPath())
if err != nil {
@@ -204,13 +231,14 @@ func createByManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest co
return nil
}
-func concurrentlyWait(wait config.Wait, options *ctlwait.WaitOptions, waitSet *util.WaitSet) {
+func concurrentlyWait(wait *config.Wait, options *ctlwait.WaitOptions, waitSet *util.WaitSet) {
defer waitSet.WaitGroup.Done()
err := options.RunWait()
if err != nil {
err = fmt.Errorf("wait strategy :%+v, err: %s", wait, err)
waitSet.ErrChan <- err
+ return
}
logger.Log.Infof("wait %+v condition met", wait)
}
diff --git a/internal/config/e2eConfig.go b/internal/config/e2eConfig.go
index 7b54aaf..9ccbe98 100644
--- a/internal/config/e2eConfig.go
+++ b/internal/config/e2eConfig.go
@@ -30,9 +30,8 @@ type Setup struct {
Env string `yaml:"env"`
File string `yaml:"file"`
Manifests []Manifest `yaml:"manifests"`
- // Run is not supported yet
- Run []Run `yaml:"run"`
- Timeout int `yaml:"timeout"`
+ Runs []Run `yaml:"runs"`
+ Timeout int `yaml:"timeout"`
}
func (s *Setup) GetFile() string {
diff --git a/internal/config/globalConfig.go b/internal/config/globalConfig.go
index fb72c9c..7cef38e 100644
--- a/internal/config/globalConfig.go
+++ b/internal/config/globalConfig.go
@@ -28,7 +28,7 @@ import (
"gopkg.in/yaml.v2"
)
-// GlobalE2EConfig store E2EConfig which can be used globally.
+// GlobalE2EConfig stores E2EConfig which can be used globally.
type GlobalE2EConfig struct {
Error error
E2EConfig E2EConfig