You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by zh...@apache.org on 2021/03/13 08:33:26 UTC

[skywalking-infra-e2e] branch main updated: Implement the run and wait part of setup (#13)

This is an automated email from the ASF dual-hosted git repository.

zhangke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git


The following commit(s) were added to refs/heads/main by this push:
     new 25b6c78  Implement the run and wait part of setup (#13)
25b6c78 is described below

commit 25b6c78a540734480c2b0f8b5e4a22201646da68
Author: Humbert Zhang <50...@qq.com>
AuthorDate: Sat Mar 13 16:33:20 2021 +0800

    Implement the run and wait part of setup (#13)
    
    * Implement the run and wait part of setup
---
 examples/simple/e2e.yaml            |  16 ++++++
 internal/components/setup/common.go | 108 ++++++++++++++++++++++++++++++++++++
 internal/components/setup/kind.go   | 106 ++++++++++++++++++++++-------------
 internal/config/e2eConfig.go        |   5 +-
 internal/config/globalConfig.go     |   2 +-
 5 files changed, 194 insertions(+), 43 deletions(-)

diff --git a/examples/simple/e2e.yaml b/examples/simple/e2e.yaml
index 480099b..778718e 100644
--- a/examples/simple/e2e.yaml
+++ b/examples/simple/e2e.yaml
@@ -33,6 +33,22 @@ setup:
         - namespace: default
           resource: pod
           for: condition=Ready
+  runs: # commands are serial within one code block and parallel between code blocks
+    - command: | # it can be a shell script or anything executable
+        cp $TMPDIR/e2e-k8s.config ~/.kube/config
+        kubectl create deployment nginx1 --image=nginx
+      wait:
+        - namespace: default
+          resource: deployment/nginx1
+          for: condition=Available
+    - command: |
+        cp $TMPDIR/e2e-k8s.config ~/.kube/config
+        kubectl create deployment nginx2 \
+                                  --image=nginx
+      wait:
+        - namespace: default
+          resource: deployment/nginx2
+          for: condition=Available
   timeout: 600
 
 verify:
diff --git a/internal/components/setup/common.go b/internal/components/setup/common.go
new file mode 100644
index 0000000..cae40c4
--- /dev/null
+++ b/internal/components/setup/common.go
@@ -0,0 +1,108 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package setup
+
+import (
+	"fmt"
+	"io/ioutil"
+	"time"
+
+	"github.com/apache/skywalking-infra-e2e/internal/config"
+	"github.com/apache/skywalking-infra-e2e/internal/logger"
+	"github.com/apache/skywalking-infra-e2e/internal/util"
+)
+
+// RunCommandsAndWait Concurrently run commands and wait for conditions.
+func RunCommandsAndWait(runs []config.Run, timeout time.Duration) error {
+	waitSet := util.NewWaitSet(timeout)
+
+	for idx := range runs {
+		run := runs[idx]
+		commands := run.Command
+		if len(commands) < 1 {
+			continue
+		}
+
+		waitSet.WaitGroup.Add(1)
+		go executeCommandsAndWait(commands, run.Waits, waitSet)
+	}
+
+	go func() {
+		waitSet.WaitGroup.Wait()
+		close(waitSet.FinishChan)
+	}()
+
+	select {
+	case <-waitSet.FinishChan:
+		logger.Log.Infof("all commands executed successfully")
+	case err := <-waitSet.ErrChan:
+		logger.Log.Errorf("execute command error")
+		return err
+	case <-time.After(waitSet.Timeout):
+		return fmt.Errorf("wait for commands run timeout after %d seconds", int(timeout.Seconds()))
+	}
+
+	return nil
+}
+
+func executeCommandsAndWait(commands string, waits []config.Wait, waitSet *util.WaitSet) {
+	defer waitSet.WaitGroup.Done()
+
+	// executes commands
+	logger.Log.Infof("executing commands [%s]", commands)
+	result, err := util.ExecuteCommand(commands)
+	if err != nil {
+		err = fmt.Errorf("commands: [%s] runs error: %s", commands, err)
+		waitSet.ErrChan <- err
+	}
+	logger.Log.Infof("executed commands [%s], result: %s", commands, result)
+
+	// waits for conditions meet
+	for idx := range waits {
+		wait := waits[idx]
+		logger.Log.Infof("waiting for %+v", wait)
+
+		kubeConfigYaml, err := ioutil.ReadFile(kubeConfigPath)
+		if err != nil {
+			err = fmt.Errorf("read kube config failed: %s", err)
+			waitSet.ErrChan <- err
+		}
+
+		options, err := getWaitOptions(kubeConfigYaml, &wait)
+		if err != nil {
+			err = fmt.Errorf("commands: [%s] get wait options error: %s", commands, err)
+			waitSet.ErrChan <- err
+		}
+
+		err = options.RunWait()
+		if err != nil {
+			err = fmt.Errorf("commands: [%s] waits error: %s", commands, err)
+			waitSet.ErrChan <- err
+			return
+		}
+		logger.Log.Infof("wait %+v condition met", wait)
+	}
+}
+
+// NewTimeout calculates new timeout since timeBefore.
+func NewTimeout(timeBefore time.Time, timeout time.Duration) time.Duration {
+	elapsed := time.Since(timeBefore)
+	newTimeout := timeout - elapsed
+	return newTimeout
+}
diff --git a/internal/components/setup/kind.go b/internal/components/setup/kind.go
index 2b6c100..c4c4f27 100644
--- a/internal/components/setup/kind.go
+++ b/internal/components/setup/kind.go
@@ -65,6 +65,16 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
 		return nil
 	}
 
+	if err := createKindCluster(kindConfigPath); err != nil {
+		return err
+	}
+
+	c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+	if err != nil {
+		logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
+		return err
+	}
+
 	timeout := e2eConfig.Setup.Timeout
 	var waitTimeout time.Duration
 	if timeout <= 0 {
@@ -75,19 +85,26 @@ func KindSetup(e2eConfig *config.E2EConfig) error {
 
 	logger.Log.Debugf("wait timeout is %d seconds", int(waitTimeout.Seconds()))
 
-	if err := createKindCluster(kindConfigPath); err != nil {
-		return err
-	}
+	// record time now
+	timeNow := time.Now()
 
-	c, dc, err := util.ConnectToK8sCluster(kubeConfigPath)
+	err = createManifestsAndWait(c, dc, manifests, waitTimeout)
 	if err != nil {
-		logger.Log.Errorf("connect to k8s cluster failed according to config file: %s", kubeConfigPath)
 		return err
 	}
 
-	err = createManifestsAndWait(c, dc, manifests, waitTimeout)
-	if err != nil {
-		return err
+	// calculates new timeout. manifests and run of setup uses the same countdown.
+	runWaitTimeout := NewTimeout(timeNow, waitTimeout)
+	if runWaitTimeout <= 0 {
+		return fmt.Errorf("kind setup timeout")
+	}
+
+	if len(e2eConfig.Setup.Runs) > 0 {
+		logger.Log.Debugf("executing commands...")
+		err := RunCommandsAndWait(e2eConfig.Setup.Runs, runWaitTimeout)
+		if err != nil {
+			return err
+		}
 	}
 	return nil
 }
@@ -129,42 +146,17 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manif
 			continue
 		}
 
-		for _, wait := range waits {
-			if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
-				return fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
-			}
-
-			restClientGetter := util.NewSimpleRESTClientGetter(wait.Namespace, string(kubeConfigYaml))
-			silenceOutput, _ := os.Open(os.DevNull)
-			ioStreams := genericclioptions.IOStreams{In: os.Stdin, Out: silenceOutput, ErrOut: os.Stderr}
-			waitFlags := ctlwait.NewWaitFlags(restClientGetter, ioStreams)
-			// global timeout is set in e2e.yaml
-			waitFlags.Timeout = constant.SingleDefaultWaitTimeout
-			waitFlags.ForCondition = wait.For
-
-			var args []string
-			// resource.group/resource.name OR resource.group
-			if wait.Resource != "" {
-				args = append(args, wait.Resource)
-			} else {
-				return fmt.Errorf("resource must be provided in wait block")
-			}
-
-			if wait.LabelSelector != "" {
-				waitFlags.ResourceBuilderFlags.LabelSelector = &wait.LabelSelector
-			} else if !strings.Contains(wait.Resource, "/") {
-				// if labelSelector is nil and resource only provide resource.group, check all resources.
-				waitFlags.ResourceBuilderFlags.All = &constant.True
-			}
+		for idx := range waits {
+			wait := waits[idx]
+			logger.Log.Infof("waiting for %+v", wait)
 
-			options, err := waitFlags.ToOptions(args)
+			options, err := getWaitOptions(kubeConfigYaml, &wait)
 			if err != nil {
 				return err
 			}
 
-			logger.Log.Infof("waiting for %+v", wait)
 			waitSet.WaitGroup.Add(1)
-			go concurrentlyWait(wait, options, waitSet)
+			go concurrentlyWait(&wait, options, waitSet)
 		}
 	}
 
@@ -186,6 +178,41 @@ func createManifestsAndWait(c *kubernetes.Clientset, dc dynamic.Interface, manif
 	return nil
 }
 
+func getWaitOptions(kubeConfigYaml []byte, wait *config.Wait) (options *ctlwait.WaitOptions, err error) {
+	if strings.Contains(wait.Resource, "/") && wait.LabelSelector != "" {
+		return nil, fmt.Errorf("when passing resource.group/resource.name in Resource, the labelSelector can not be set at the same time")
+	}
+
+	restClientGetter := util.NewSimpleRESTClientGetter(wait.Namespace, string(kubeConfigYaml))
+	silenceOutput, _ := os.Open(os.DevNull)
+	ioStreams := genericclioptions.IOStreams{In: os.Stdin, Out: silenceOutput, ErrOut: os.Stderr}
+	waitFlags := ctlwait.NewWaitFlags(restClientGetter, ioStreams)
+	// global timeout is set in e2e.yaml
+	waitFlags.Timeout = constant.SingleDefaultWaitTimeout
+	waitFlags.ForCondition = wait.For
+
+	var args []string
+	// resource.group/resource.name OR resource.group
+	if wait.Resource != "" {
+		args = append(args, wait.Resource)
+	} else {
+		return nil, fmt.Errorf("resource must be provided in wait block")
+	}
+
+	if wait.LabelSelector != "" {
+		waitFlags.ResourceBuilderFlags.LabelSelector = &wait.LabelSelector
+	} else if !strings.Contains(wait.Resource, "/") {
+		// if labelSelector is nil and resource only provide resource.group, check all resources.
+		waitFlags.ResourceBuilderFlags.All = &constant.True
+	}
+
+	options, err = waitFlags.ToOptions(args)
+	if err != nil {
+		return nil, err
+	}
+	return options, nil
+}
+
 func createByManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest config.Manifest) error {
 	files, err := util.GetManifests(manifest.GetPath())
 	if err != nil {
@@ -204,13 +231,14 @@ func createByManifest(c *kubernetes.Clientset, dc dynamic.Interface, manifest co
 	return nil
 }
 
-func concurrentlyWait(wait config.Wait, options *ctlwait.WaitOptions, waitSet *util.WaitSet) {
+func concurrentlyWait(wait *config.Wait, options *ctlwait.WaitOptions, waitSet *util.WaitSet) {
 	defer waitSet.WaitGroup.Done()
 
 	err := options.RunWait()
 	if err != nil {
 		err = fmt.Errorf("wait strategy :%+v, err: %s", wait, err)
 		waitSet.ErrChan <- err
+		return
 	}
 	logger.Log.Infof("wait %+v condition met", wait)
 }
diff --git a/internal/config/e2eConfig.go b/internal/config/e2eConfig.go
index 7b54aaf..9ccbe98 100644
--- a/internal/config/e2eConfig.go
+++ b/internal/config/e2eConfig.go
@@ -30,9 +30,8 @@ type Setup struct {
 	Env       string     `yaml:"env"`
 	File      string     `yaml:"file"`
 	Manifests []Manifest `yaml:"manifests"`
-	// Run is not supported yet
-	Run     []Run `yaml:"run"`
-	Timeout int   `yaml:"timeout"`
+	Runs      []Run      `yaml:"runs"`
+	Timeout   int        `yaml:"timeout"`
 }
 
 func (s *Setup) GetFile() string {
diff --git a/internal/config/globalConfig.go b/internal/config/globalConfig.go
index fb72c9c..7cef38e 100644
--- a/internal/config/globalConfig.go
+++ b/internal/config/globalConfig.go
@@ -28,7 +28,7 @@ import (
 	"gopkg.in/yaml.v2"
 )
 
-// GlobalE2EConfig store E2EConfig which can be used globally.
+// GlobalE2EConfig stores E2EConfig which can be used globally.
 type GlobalE2EConfig struct {
 	Error     error
 	E2EConfig E2EConfig