You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/08/20 12:05:56 UTC

[skywalking-infra-e2e] branch main updated: Fix the GHA docker compose wait strategy failure (#36)

This is an automated email from the ASF dual-hosted git repository.

liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git


The following commit(s) were added to refs/heads/main by this push:
     new b43d6ab  Fix the GHA docker compose wait strategy failure (#36)
b43d6ab is described below

commit b43d6ab4e994ec0c256029c92f981dbe3b924db0
Author: mrproliu <74...@qq.com>
AuthorDate: Fri Aug 20 20:05:06 2021 +0800

    Fix the GHA docker compose wait strategy failure (#36)
---
 internal/components/setup/compose.go          |  72 +++-
 internal/components/setup/compose_provider.go | 553 ++++++++++++++++++++++++++
 2 files changed, 605 insertions(+), 20 deletions(-)

diff --git a/internal/components/setup/compose.go b/internal/components/setup/compose.go
index 2ccddbb..3acb76d 100644
--- a/internal/components/setup/compose.go
+++ b/internal/components/setup/compose.go
@@ -73,10 +73,28 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
 	}
 
 	// find exported port and build env
+	err = exposeServiceEnv(serviceWithPorts, cli, identifier, e2eConfig)
+	if err != nil {
+		return err
+	}
+
+	// run steps
+	err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
+	if err != nil {
+		logger.Log.Errorf("execute steps error: %v", err)
+		return err
+	}
+
+	return nil
+}
+
+func exposeServiceEnv(serviceWithPorts map[string][]*hostPortCachedStrategy, cli *client.Client, identity string, e2eConfig *config.E2EConfig) error {
+	dockerProvider := &DockerProvider{client: cli}
+	// find exported port and build env
 	for service, portList := range serviceWithPorts {
-		container, err2 := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
-		if err2 != nil {
-			return err2
+		container, err := findContainer(cli, fmt.Sprintf("%s_%s", identity, getInstanceName(service)))
+		if err != nil {
+			return err
 		}
 		if len(portList) == 0 {
 			continue
@@ -85,13 +103,14 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
 		containerPorts := container.Ports
 
 		// get real ip address for access and export to env
-		host, err2 := portList[0].target.Host(context.Background())
-		if err2 != nil {
-			return err2
+		host, err := dockerProvider.daemonHost(context.Background())
+		if err != nil {
+			return err
 		}
+
 		// format: <service_name>_host
-		if err2 := exportComposeEnv(fmt.Sprintf("%s_host", service), host, service); err2 != nil {
-			return err2
+		if err := exportComposeEnv(fmt.Sprintf("%s_host", service), host, service); err != nil {
+			return err
 		}
 
 		for inx := range portList {
@@ -100,26 +119,22 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
 					continue
 				}
 
+				if err := waitPortUntilReady(e2eConfig, container, dockerProvider, portList[inx].expectPort); err != nil {
+					return err
+				}
+
 				// expose env config to env
 				// format: <service_name>_<port>
-				if err2 := exportComposeEnv(
+				if err := exportComposeEnv(
 					fmt.Sprintf("%s_%d", service, containerPort.PrivatePort),
 					fmt.Sprintf("%d", containerPort.PublicPort),
-					service); err2 != nil {
-					return err2
+					service); err != nil {
+					return err
 				}
 				break
 			}
 		}
 	}
-
-	// run steps
-	err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
-	if err != nil {
-		logger.Log.Errorf("execute steps error: %v", err)
-		return err
-	}
-
 	return nil
 }
 
@@ -160,7 +175,8 @@ func bindWaitPort(e2eConfig *config.E2EConfig, compose *testcontainers.LocalDock
 				expectPort:       exportPort,
 				HostPortStrategy: *wait.NewHostPortStrategy(nat.Port(fmt.Sprintf("%d/tcp", exportPort))).WithStartupTimeout(waitTimeout),
 			}
-			compose.WithExposedService(service, exportPort, strategy)
+			// temporary don't use testcontainers-go framework wait strategy until fix docker-in-docker bug
+			// compose.WithExposedService(service, exportPort, strategy)
 
 			serviceWithPorts[service] = append(serviceWithPorts[service], strategy)
 		}
@@ -218,3 +234,19 @@ func (hp *hostPortCachedStrategy) WaitUntilReady(ctx context.Context, target wai
 	hp.target = target
 	return hp.HostPortStrategy.WaitUntilReady(ctx, target)
 }
+
+func waitPortUntilReady(e2eConfig *config.E2EConfig, container *types.Container, dockerProvider *DockerProvider, expectPort int) error {
+	// wait port
+	var waitTimeout time.Duration
+	if e2eConfig.Setup.Timeout <= 0 {
+		waitTimeout = constant.DefaultWaitTimeout
+	} else {
+		waitTimeout = time.Duration(e2eConfig.Setup.Timeout) * time.Second
+	}
+	waitPort := nat.Port(fmt.Sprintf("%d/tcp", expectPort))
+	target := &DockerContainer{
+		ID:         container.ID,
+		WaitingFor: wait.NewHostPortStrategy(waitPort),
+		provider:   dockerProvider}
+	return WaitPort(context.Background(), target, waitPort, waitTimeout)
+}
diff --git a/internal/components/setup/compose_provider.go b/internal/components/setup/compose_provider.go
new file mode 100644
index 0000000..05e570d
--- /dev/null
+++ b/internal/components/setup/compose_provider.go
@@ -0,0 +1,553 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package setup
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"net/url"
+	"os"
+	"os/exec"
+	"strconv"
+	"strings"
+	"syscall"
+	"time"
+
+	"github.com/docker/docker/api/types"
+	"github.com/docker/docker/client"
+	"github.com/docker/go-connections/nat"
+	"github.com/testcontainers/testcontainers-go/wait"
+)
+
+const (
+	Bridge        = "bridge"         // Bridge network name (as well as driver)
+	ReaperDefault = "reaper_default" // Default network name when bridge is not available
+	localhost     = "localhost"
+
+	TestcontainerLabel = "org.testcontainers.golang"
+)
+
+// NetworkRequest represents the parameters used to get a network
+type NetworkRequest struct {
+	Driver         string
+	CheckDuplicate bool
+	Internal       bool
+	EnableIPv6     bool
+	Name           string
+	Labels         map[string]string
+	Attachable     bool
+
+	ReaperImage string // alternative reaper registry
+}
+
+type Log struct {
+	LogType string
+	Content []byte
+}
+
+type LogConsumer interface {
+	Accept(Log)
+}
+
+type Network interface {
+	Remove(context.Context) error // removes the network
+}
+
+// DockerContainer represents a container started using Docker
+type DockerContainer struct {
+	// Container ID from Docker
+	ID         string
+	WaitingFor wait.Strategy
+	Image      string
+
+	provider  *DockerProvider
+	consumers []LogConsumer
+}
+
+func (c *DockerContainer) GetContainerID() string {
+	return c.ID
+}
+
+// Endpoint gets proto://host:port string for the first exposed port
+// Will returns just host:port if proto is ""
+func (c *DockerContainer) Endpoint(ctx context.Context, proto string) (string, error) {
+	ports, err := c.Ports(ctx)
+	if err != nil {
+		return "", err
+	}
+
+	// get first port
+	var firstPort nat.Port
+	for p := range ports {
+		firstPort = p
+		break
+	}
+
+	return c.PortEndpoint(ctx, firstPort, proto)
+}
+
+// PortEndpoint gets proto://host:port string for the given exposed port
+// Will returns just host:port if proto is ""
+func (c *DockerContainer) PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) {
+	host, err := c.Host(ctx)
+	if err != nil {
+		return "", err
+	}
+
+	outerPort, err := c.MappedPort(ctx, port)
+	if err != nil {
+		return "", err
+	}
+
+	protoFull := ""
+	if proto != "" {
+		protoFull = fmt.Sprintf("%s://", proto)
+	}
+
+	return fmt.Sprintf("%s%s:%s", protoFull, host, outerPort.Port()), nil
+}
+
+// Host gets host (ip or name) of the docker daemon where the container port is exposed
+// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
+// You can use the "TC_HOST" env variable to set this yourself
+func (c *DockerContainer) Host(ctx context.Context) (string, error) {
+	host, err := c.provider.daemonHost(ctx)
+	if err != nil {
+		return "", err
+	}
+	return host, nil
+}
+
+// MappedPort gets externally mapped port for a container port
+func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return "", err
+	}
+	if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
+		return port, nil
+	}
+	ports, err := c.Ports(ctx)
+	if err != nil {
+		return "", err
+	}
+
+	for k, p := range ports {
+		if k.Port() != port.Port() {
+			continue
+		}
+		if port.Proto() != "" && k.Proto() != port.Proto() {
+			continue
+		}
+		if len(p) == 0 {
+			continue
+		}
+		return nat.NewPort(k.Proto(), p[0].HostPort)
+	}
+
+	return "", errors.New("port not found")
+}
+
+// Ports gets the exposed ports for the container.
+func (c *DockerContainer) Ports(ctx context.Context) (nat.PortMap, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return inspect.NetworkSettings.Ports, nil
+}
+
+func (c *DockerContainer) inspectContainer(ctx context.Context) (*types.ContainerJSON, error) {
+	inspect, err := c.provider.client.ContainerInspect(ctx, c.ID)
+	if err != nil {
+		return nil, err
+	}
+
+	return &inspect, nil
+}
+
+// Logs will fetch both STDOUT and STDERR from the current container. Returns a
+// ReadCloser and leaves it up to the caller to extract what it wants.
+func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
+	options := types.ContainerLogsOptions{
+		ShowStdout: true,
+		ShowStderr: true,
+	}
+	return c.provider.client.ContainerLogs(ctx, c.ID, options)
+}
+
+// FollowOutput adds a LogConsumer to be sent logs from the container's
+// STDOUT and STDERR
+func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
+	if c.consumers == nil {
+		c.consumers = []LogConsumer{
+			consumer,
+		}
+	} else {
+		c.consumers = append(c.consumers, consumer)
+	}
+}
+
+// Name gets the name of the container.
+func (c *DockerContainer) Name(ctx context.Context) (string, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return "", err
+	}
+	return inspect.Name, nil
+}
+
+// Networks gets the names of the networks the container is attached to.
+func (c *DockerContainer) Networks(ctx context.Context) ([]string, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return []string{}, err
+	}
+
+	networks := inspect.NetworkSettings.Networks
+
+	n := []string{}
+
+	for k := range networks {
+		n = append(n, k)
+	}
+
+	return n, nil
+}
+
+// ContainerIP gets the IP address of the primary network within the container.
+func (c *DockerContainer) ContainerIP(ctx context.Context) (string, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return "", err
+	}
+
+	return inspect.NetworkSettings.IPAddress, nil
+}
+
+// NetworkAliases gets the aliases of the container for the networks it is attached to.
+func (c *DockerContainer) NetworkAliases(ctx context.Context) (map[string][]string, error) {
+	inspect, err := c.inspectContainer(ctx)
+	if err != nil {
+		return map[string][]string{}, err
+	}
+
+	networks := inspect.NetworkSettings.Networks
+
+	a := map[string][]string{}
+
+	for k := range networks {
+		a[k] = networks[k].Aliases
+	}
+
+	return a, nil
+}
+
+func (c *DockerContainer) Exec(ctx context.Context, cmd []string) (int, error) {
+	cli := c.provider.client
+	response, err := cli.ContainerExecCreate(ctx, c.ID, types.ExecConfig{
+		Cmd:    cmd,
+		Detach: false,
+	})
+	if err != nil {
+		return 0, err
+	}
+
+	err = cli.ContainerExecStart(ctx, response.ID, types.ExecStartCheck{
+		Detach: false,
+	})
+	if err != nil {
+		return 0, err
+	}
+
+	var exitCode int
+	for {
+		execResp, err := cli.ContainerExecInspect(ctx, response.ID)
+		if err != nil {
+			return 0, err
+		}
+
+		if !execResp.Running {
+			exitCode = execResp.ExitCode
+			break
+		}
+
+		time.Sleep(100 * time.Millisecond)
+	}
+
+	return exitCode, nil
+}
+
+// DockerNetwork represents a network started using Docker
+type DockerNetwork struct {
+	ID     string // Network ID from Docker
+	Driver string
+	Name   string
+}
+
+// DockerProvider implements the ContainerProvider interface
+type DockerProvider struct {
+	client         *client.Client
+	hostCache      string
+	defaultNetwork string // default container network
+}
+
+// daemonHost gets the host or ip of the Docker daemon where ports are exposed on
+// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
+// You can use the "TC_HOST" env variable to set this yourself
+func (p *DockerProvider) daemonHost(ctx context.Context) (string, error) {
+	if p.hostCache != "" {
+		return p.hostCache, nil
+	}
+
+	host, exists := os.LookupEnv("TC_HOST")
+	if exists {
+		p.hostCache = host
+		return p.hostCache, nil
+	}
+
+	// infer from Docker host
+	parsedURL, err := url.Parse(p.client.DaemonHost())
+	if err != nil {
+		return "", err
+	}
+
+	switch parsedURL.Scheme {
+	case "http", "https", "tcp":
+		p.hostCache = parsedURL.Hostname()
+	case "unix", "npipe":
+		if inAContainer() {
+			ip, err := p.GetGatewayIP(ctx)
+			if err != nil {
+				// fallback to getDefaultGatewayIP
+				ip, err = getDefaultGatewayIP()
+				if err != nil {
+					ip = localhost
+				}
+			}
+			p.hostCache = ip
+		} else {
+			p.hostCache = localhost
+		}
+	default:
+		return "", errors.New("could not determine host through env or docker host")
+	}
+
+	return p.hostCache, nil
+}
+
+// GetNetwork returns the object representing the network identified by its name
+func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (types.NetworkResource, error) {
+	networkResource, err := p.client.NetworkInspect(ctx, req.Name, types.NetworkInspectOptions{
+		Verbose: true,
+	})
+	if err != nil {
+		return types.NetworkResource{}, err
+	}
+
+	return networkResource, err
+}
+
+func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
+	// Use a default network as defined in the DockerProvider
+	var err error
+	if p.defaultNetwork == "" {
+		p.defaultNetwork, err = getDefaultNetwork(ctx, p.client)
+		if err != nil {
+			return "", err
+		}
+	}
+	nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.defaultNetwork})
+	if err != nil {
+		return "", err
+	}
+
+	var ip string
+	for _, config := range nw.IPAM.Config {
+		if config.Gateway != "" {
+			ip = config.Gateway
+			break
+		}
+	}
+	if ip == "" {
+		return "", errors.New("failed to get gateway IP from network settings")
+	}
+
+	return ip, nil
+}
+
+func inAContainer() bool {
+	if _, err := os.Stat("/.dockerenv"); err == nil {
+		return true
+	}
+	return false
+}
+
+// deprecated
+func getDefaultGatewayIP() (string, error) {
+	cmd := exec.Command("sh", "-c", "ip route|awk '/default/ { print $3 }'")
+	stdout, err := cmd.Output()
+	if err != nil {
+		return "", errors.New("failed to detect docker host")
+	}
+	ip := strings.TrimSpace(string(stdout))
+	if ip == "" {
+		return "", errors.New("failed to parse default gateway IP")
+	}
+	return ip, nil
+}
+
+func getDefaultNetwork(ctx context.Context, cli *client.Client) (string, error) {
+	// Get list of available networks
+	networkResources, err := cli.NetworkList(ctx, types.NetworkListOptions{})
+	if err != nil {
+		return "", err
+	}
+
+	reaperNetwork := ReaperDefault
+
+	reaperNetworkExists := false
+
+	for inx := range networkResources {
+		if networkResources[inx].Name == Bridge {
+			return Bridge, nil
+		}
+
+		if networkResources[inx].Name == reaperNetwork {
+			reaperNetworkExists = true
+		}
+	}
+
+	// Create a bridge network for the container communications
+	if !reaperNetworkExists {
+		_, err = cli.NetworkCreate(ctx, reaperNetwork, types.NetworkCreate{
+			Driver:     Bridge,
+			Attachable: true,
+			Labels: map[string]string{
+				TestcontainerLabel: "true",
+			},
+		})
+
+		if err != nil {
+			return "", err
+		}
+	}
+
+	return reaperNetwork, nil
+}
+
+// WaitUntilReady implements Strategy.WaitUntilReady
+func WaitPort(ctx context.Context, target wait.StrategyTarget, waitPort nat.Port, timeout time.Duration) (err error) {
+	// limit context to startupTimeout
+	ctx, cancelContext := context.WithTimeout(ctx, timeout)
+	defer cancelContext()
+
+	ipAddress, err := target.Host(ctx)
+	if err != nil {
+		return
+	}
+
+	var waitInterval = 100 * time.Millisecond
+
+	port, err := findMappedPort(ctx, target, waitPort)
+
+	proto := port.Proto()
+	portNumber := port.Int()
+	portString := strconv.Itoa(portNumber)
+
+	// external check
+	dialer := net.Dialer{}
+	address := net.JoinHostPort(ipAddress, portString)
+	for {
+		conn, err := dialer.DialContext(ctx, proto, address)
+		if err != nil {
+			if v, ok := err.(*net.OpError); ok {
+				if v2, ok := (v.Err).(*os.SyscallError); ok {
+					if isConnRefusedErr(v2.Err) {
+						time.Sleep(waitInterval)
+						continue
+					}
+				}
+			}
+			return err
+		}
+		conn.Close()
+		break
+	}
+
+	// internal check
+	command := buildInternalCheckCommand(waitPort.Int())
+	for {
+		if ctx.Err() != nil {
+			return ctx.Err()
+		}
+		exitCode, err := target.Exec(ctx, []string{"/bin/sh", "-c", command})
+		if err != nil {
+			return err
+		}
+
+		if exitCode == 0 {
+			break
+		} else if exitCode == 126 {
+			return errors.New("/bin/sh command not executable")
+		}
+	}
+
+	return nil
+}
+
+func findMappedPort(ctx context.Context, target wait.StrategyTarget, waitPort nat.Port) (nat.Port, error) {
+	var waitInterval = 100 * time.Millisecond
+
+	var port nat.Port
+	port, err := target.MappedPort(ctx, waitPort)
+	var i = 0
+
+	for port == "" {
+		i++
+
+		select {
+		case <-ctx.Done():
+			return "", fmt.Errorf("%s:%w", ctx.Err(), err)
+		case <-time.After(waitInterval):
+			port, err = target.MappedPort(ctx, waitPort)
+			if err != nil {
+				fmt.Printf("(%d) [%s] %s\n", i, port, err)
+			}
+		}
+	}
+	return port, err
+}
+
+func isConnRefusedErr(err error) bool {
+	return err == syscall.ECONNREFUSED
+}
+
+func buildInternalCheckCommand(internalPort int) string {
+	command := `(
+					cat /proc/net/tcp* | awk '{print $2}' | grep -i :%04x ||
+					nc -vz -w 1 localhost %d ||
+					/bin/sh -c '</dev/tcp/localhost/%d'
+				)
+				`
+	return "true && " + fmt.Sprintf(command, internalPort, internalPort, internalPort)
+}