You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by li...@apache.org on 2021/08/20 12:05:56 UTC
[skywalking-infra-e2e] branch main updated: Fix the GHA docker
compose wait strategy failure (#36)
This is an automated email from the ASF dual-hosted git repository.
liuhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-infra-e2e.git
The following commit(s) were added to refs/heads/main by this push:
new b43d6ab Fix the GHA docker compose wait strategy failure (#36)
b43d6ab is described below
commit b43d6ab4e994ec0c256029c92f981dbe3b924db0
Author: mrproliu <74...@qq.com>
AuthorDate: Fri Aug 20 20:05:06 2021 +0800
Fix the GHA docker compose wait strategy failure (#36)
---
internal/components/setup/compose.go | 72 +++-
internal/components/setup/compose_provider.go | 553 ++++++++++++++++++++++++++
2 files changed, 605 insertions(+), 20 deletions(-)
diff --git a/internal/components/setup/compose.go b/internal/components/setup/compose.go
index 2ccddbb..3acb76d 100644
--- a/internal/components/setup/compose.go
+++ b/internal/components/setup/compose.go
@@ -73,10 +73,28 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
}
// find exported port and build env
+ err = exposeServiceEnv(serviceWithPorts, cli, identifier, e2eConfig)
+ if err != nil {
+ return err
+ }
+
+ // run steps
+ err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
+ if err != nil {
+ logger.Log.Errorf("execute steps error: %v", err)
+ return err
+ }
+
+ return nil
+}
+
+func exposeServiceEnv(serviceWithPorts map[string][]*hostPortCachedStrategy, cli *client.Client, identity string, e2eConfig *config.E2EConfig) error {
+ dockerProvider := &DockerProvider{client: cli}
+ // find exported port and build env
for service, portList := range serviceWithPorts {
- container, err2 := findContainer(cli, fmt.Sprintf("%s_%s", identifier, getInstanceName(service)))
- if err2 != nil {
- return err2
+ container, err := findContainer(cli, fmt.Sprintf("%s_%s", identity, getInstanceName(service)))
+ if err != nil {
+ return err
}
if len(portList) == 0 {
continue
@@ -85,13 +103,14 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
containerPorts := container.Ports
// get real ip address for access and export to env
- host, err2 := portList[0].target.Host(context.Background())
- if err2 != nil {
- return err2
+ host, err := dockerProvider.daemonHost(context.Background())
+ if err != nil {
+ return err
}
+
// format: <service_name>_host
- if err2 := exportComposeEnv(fmt.Sprintf("%s_host", service), host, service); err2 != nil {
- return err2
+ if err := exportComposeEnv(fmt.Sprintf("%s_host", service), host, service); err != nil {
+ return err
}
for inx := range portList {
@@ -100,26 +119,22 @@ func ComposeSetup(e2eConfig *config.E2EConfig) error {
continue
}
+ if err := waitPortUntilReady(e2eConfig, container, dockerProvider, portList[inx].expectPort); err != nil {
+ return err
+ }
+
// expose env config to env
// format: <service_name>_<port>
- if err2 := exportComposeEnv(
+ if err := exportComposeEnv(
fmt.Sprintf("%s_%d", service, containerPort.PrivatePort),
fmt.Sprintf("%d", containerPort.PublicPort),
- service); err2 != nil {
- return err2
+ service); err != nil {
+ return err
}
break
}
}
}
-
- // run steps
- err = RunStepsAndWait(e2eConfig.Setup.Steps, e2eConfig.Setup.Timeout, nil)
- if err != nil {
- logger.Log.Errorf("execute steps error: %v", err)
- return err
- }
-
return nil
}
@@ -160,7 +175,8 @@ func bindWaitPort(e2eConfig *config.E2EConfig, compose *testcontainers.LocalDock
expectPort: exportPort,
HostPortStrategy: *wait.NewHostPortStrategy(nat.Port(fmt.Sprintf("%d/tcp", exportPort))).WithStartupTimeout(waitTimeout),
}
- compose.WithExposedService(service, exportPort, strategy)
+ // temporary don't use testcontainers-go framework wait strategy until fix docker-in-docker bug
+ // compose.WithExposedService(service, exportPort, strategy)
serviceWithPorts[service] = append(serviceWithPorts[service], strategy)
}
@@ -218,3 +234,19 @@ func (hp *hostPortCachedStrategy) WaitUntilReady(ctx context.Context, target wai
hp.target = target
return hp.HostPortStrategy.WaitUntilReady(ctx, target)
}
+
+func waitPortUntilReady(e2eConfig *config.E2EConfig, container *types.Container, dockerProvider *DockerProvider, expectPort int) error {
+ // wait port
+ var waitTimeout time.Duration
+ if e2eConfig.Setup.Timeout <= 0 {
+ waitTimeout = constant.DefaultWaitTimeout
+ } else {
+ waitTimeout = time.Duration(e2eConfig.Setup.Timeout) * time.Second
+ }
+ waitPort := nat.Port(fmt.Sprintf("%d/tcp", expectPort))
+ target := &DockerContainer{
+ ID: container.ID,
+ WaitingFor: wait.NewHostPortStrategy(waitPort),
+ provider: dockerProvider}
+ return WaitPort(context.Background(), target, waitPort, waitTimeout)
+}
diff --git a/internal/components/setup/compose_provider.go b/internal/components/setup/compose_provider.go
new file mode 100644
index 0000000..05e570d
--- /dev/null
+++ b/internal/components/setup/compose_provider.go
@@ -0,0 +1,553 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package setup
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/url"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "syscall"
+ "time"
+
+ "github.com/docker/docker/api/types"
+ "github.com/docker/docker/client"
+ "github.com/docker/go-connections/nat"
+ "github.com/testcontainers/testcontainers-go/wait"
+)
+
+const (
+ Bridge = "bridge" // Bridge network name (as well as driver)
+ ReaperDefault = "reaper_default" // Default network name when bridge is not available
+ localhost = "localhost"
+
+ TestcontainerLabel = "org.testcontainers.golang"
+)
+
+// NetworkRequest represents the parameters used to get a network
+type NetworkRequest struct {
+ Driver string
+ CheckDuplicate bool
+ Internal bool
+ EnableIPv6 bool
+ Name string
+ Labels map[string]string
+ Attachable bool
+
+ ReaperImage string // alternative reaper registry
+}
+
+type Log struct {
+ LogType string
+ Content []byte
+}
+
+type LogConsumer interface {
+ Accept(Log)
+}
+
+type Network interface {
+ Remove(context.Context) error // removes the network
+}
+
+// DockerContainer represents a container started using Docker
+type DockerContainer struct {
+ // Container ID from Docker
+ ID string
+ WaitingFor wait.Strategy
+ Image string
+
+ provider *DockerProvider
+ consumers []LogConsumer
+}
+
+func (c *DockerContainer) GetContainerID() string {
+ return c.ID
+}
+
+// Endpoint gets proto://host:port string for the first exposed port
+// Will returns just host:port if proto is ""
+func (c *DockerContainer) Endpoint(ctx context.Context, proto string) (string, error) {
+ ports, err := c.Ports(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ // get first port
+ var firstPort nat.Port
+ for p := range ports {
+ firstPort = p
+ break
+ }
+
+ return c.PortEndpoint(ctx, firstPort, proto)
+}
+
+// PortEndpoint gets proto://host:port string for the given exposed port
+// Will returns just host:port if proto is ""
+func (c *DockerContainer) PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) {
+ host, err := c.Host(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ outerPort, err := c.MappedPort(ctx, port)
+ if err != nil {
+ return "", err
+ }
+
+ protoFull := ""
+ if proto != "" {
+ protoFull = fmt.Sprintf("%s://", proto)
+ }
+
+ return fmt.Sprintf("%s%s:%s", protoFull, host, outerPort.Port()), nil
+}
+
+// Host gets host (ip or name) of the docker daemon where the container port is exposed
+// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
+// You can use the "TC_HOST" env variable to set this yourself
+func (c *DockerContainer) Host(ctx context.Context) (string, error) {
+ host, err := c.provider.daemonHost(ctx)
+ if err != nil {
+ return "", err
+ }
+ return host, nil
+}
+
+// MappedPort gets externally mapped port for a container port
+func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return "", err
+ }
+ if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" {
+ return port, nil
+ }
+ ports, err := c.Ports(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ for k, p := range ports {
+ if k.Port() != port.Port() {
+ continue
+ }
+ if port.Proto() != "" && k.Proto() != port.Proto() {
+ continue
+ }
+ if len(p) == 0 {
+ continue
+ }
+ return nat.NewPort(k.Proto(), p[0].HostPort)
+ }
+
+ return "", errors.New("port not found")
+}
+
+// Ports gets the exposed ports for the container.
+func (c *DockerContainer) Ports(ctx context.Context) (nat.PortMap, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return inspect.NetworkSettings.Ports, nil
+}
+
+func (c *DockerContainer) inspectContainer(ctx context.Context) (*types.ContainerJSON, error) {
+ inspect, err := c.provider.client.ContainerInspect(ctx, c.ID)
+ if err != nil {
+ return nil, err
+ }
+
+ return &inspect, nil
+}
+
+// Logs will fetch both STDOUT and STDERR from the current container. Returns a
+// ReadCloser and leaves it up to the caller to extract what it wants.
+func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
+ options := types.ContainerLogsOptions{
+ ShowStdout: true,
+ ShowStderr: true,
+ }
+ return c.provider.client.ContainerLogs(ctx, c.ID, options)
+}
+
+// FollowOutput adds a LogConsumer to be sent logs from the container's
+// STDOUT and STDERR
+func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
+ if c.consumers == nil {
+ c.consumers = []LogConsumer{
+ consumer,
+ }
+ } else {
+ c.consumers = append(c.consumers, consumer)
+ }
+}
+
+// Name gets the name of the container.
+func (c *DockerContainer) Name(ctx context.Context) (string, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return "", err
+ }
+ return inspect.Name, nil
+}
+
+// Networks gets the names of the networks the container is attached to.
+func (c *DockerContainer) Networks(ctx context.Context) ([]string, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return []string{}, err
+ }
+
+ networks := inspect.NetworkSettings.Networks
+
+ n := []string{}
+
+ for k := range networks {
+ n = append(n, k)
+ }
+
+ return n, nil
+}
+
+// ContainerIP gets the IP address of the primary network within the container.
+func (c *DockerContainer) ContainerIP(ctx context.Context) (string, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ return inspect.NetworkSettings.IPAddress, nil
+}
+
+// NetworkAliases gets the aliases of the container for the networks it is attached to.
+func (c *DockerContainer) NetworkAliases(ctx context.Context) (map[string][]string, error) {
+ inspect, err := c.inspectContainer(ctx)
+ if err != nil {
+ return map[string][]string{}, err
+ }
+
+ networks := inspect.NetworkSettings.Networks
+
+ a := map[string][]string{}
+
+ for k := range networks {
+ a[k] = networks[k].Aliases
+ }
+
+ return a, nil
+}
+
+func (c *DockerContainer) Exec(ctx context.Context, cmd []string) (int, error) {
+ cli := c.provider.client
+ response, err := cli.ContainerExecCreate(ctx, c.ID, types.ExecConfig{
+ Cmd: cmd,
+ Detach: false,
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ err = cli.ContainerExecStart(ctx, response.ID, types.ExecStartCheck{
+ Detach: false,
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ var exitCode int
+ for {
+ execResp, err := cli.ContainerExecInspect(ctx, response.ID)
+ if err != nil {
+ return 0, err
+ }
+
+ if !execResp.Running {
+ exitCode = execResp.ExitCode
+ break
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ return exitCode, nil
+}
+
+// DockerNetwork represents a network started using Docker
+type DockerNetwork struct {
+ ID string // Network ID from Docker
+ Driver string
+ Name string
+}
+
+// DockerProvider implements the ContainerProvider interface
+type DockerProvider struct {
+ client *client.Client
+ hostCache string
+ defaultNetwork string // default container network
+}
+
+// daemonHost gets the host or ip of the Docker daemon where ports are exposed on
+// Warning: this is based on your Docker host setting. Will fail if using an SSH tunnel
+// You can use the "TC_HOST" env variable to set this yourself
+func (p *DockerProvider) daemonHost(ctx context.Context) (string, error) {
+ if p.hostCache != "" {
+ return p.hostCache, nil
+ }
+
+ host, exists := os.LookupEnv("TC_HOST")
+ if exists {
+ p.hostCache = host
+ return p.hostCache, nil
+ }
+
+ // infer from Docker host
+ parsedURL, err := url.Parse(p.client.DaemonHost())
+ if err != nil {
+ return "", err
+ }
+
+ switch parsedURL.Scheme {
+ case "http", "https", "tcp":
+ p.hostCache = parsedURL.Hostname()
+ case "unix", "npipe":
+ if inAContainer() {
+ ip, err := p.GetGatewayIP(ctx)
+ if err != nil {
+ // fallback to getDefaultGatewayIP
+ ip, err = getDefaultGatewayIP()
+ if err != nil {
+ ip = localhost
+ }
+ }
+ p.hostCache = ip
+ } else {
+ p.hostCache = localhost
+ }
+ default:
+ return "", errors.New("could not determine host through env or docker host")
+ }
+
+ return p.hostCache, nil
+}
+
+// GetNetwork returns the object representing the network identified by its name
+func (p *DockerProvider) GetNetwork(ctx context.Context, req NetworkRequest) (types.NetworkResource, error) {
+ networkResource, err := p.client.NetworkInspect(ctx, req.Name, types.NetworkInspectOptions{
+ Verbose: true,
+ })
+ if err != nil {
+ return types.NetworkResource{}, err
+ }
+
+ return networkResource, err
+}
+
+func (p *DockerProvider) GetGatewayIP(ctx context.Context) (string, error) {
+ // Use a default network as defined in the DockerProvider
+ var err error
+ if p.defaultNetwork == "" {
+ p.defaultNetwork, err = getDefaultNetwork(ctx, p.client)
+ if err != nil {
+ return "", err
+ }
+ }
+ nw, err := p.GetNetwork(ctx, NetworkRequest{Name: p.defaultNetwork})
+ if err != nil {
+ return "", err
+ }
+
+ var ip string
+ for _, config := range nw.IPAM.Config {
+ if config.Gateway != "" {
+ ip = config.Gateway
+ break
+ }
+ }
+ if ip == "" {
+ return "", errors.New("failed to get gateway IP from network settings")
+ }
+
+ return ip, nil
+}
+
+func inAContainer() bool {
+ if _, err := os.Stat("/.dockerenv"); err == nil {
+ return true
+ }
+ return false
+}
+
+// deprecated
+func getDefaultGatewayIP() (string, error) {
+ cmd := exec.Command("sh", "-c", "ip route|awk '/default/ { print $3 }'")
+ stdout, err := cmd.Output()
+ if err != nil {
+ return "", errors.New("failed to detect docker host")
+ }
+ ip := strings.TrimSpace(string(stdout))
+ if ip == "" {
+ return "", errors.New("failed to parse default gateway IP")
+ }
+ return ip, nil
+}
+
+func getDefaultNetwork(ctx context.Context, cli *client.Client) (string, error) {
+ // Get list of available networks
+ networkResources, err := cli.NetworkList(ctx, types.NetworkListOptions{})
+ if err != nil {
+ return "", err
+ }
+
+ reaperNetwork := ReaperDefault
+
+ reaperNetworkExists := false
+
+ for inx := range networkResources {
+ if networkResources[inx].Name == Bridge {
+ return Bridge, nil
+ }
+
+ if networkResources[inx].Name == reaperNetwork {
+ reaperNetworkExists = true
+ }
+ }
+
+ // Create a bridge network for the container communications
+ if !reaperNetworkExists {
+ _, err = cli.NetworkCreate(ctx, reaperNetwork, types.NetworkCreate{
+ Driver: Bridge,
+ Attachable: true,
+ Labels: map[string]string{
+ TestcontainerLabel: "true",
+ },
+ })
+
+ if err != nil {
+ return "", err
+ }
+ }
+
+ return reaperNetwork, nil
+}
+
+// WaitUntilReady implements Strategy.WaitUntilReady
+func WaitPort(ctx context.Context, target wait.StrategyTarget, waitPort nat.Port, timeout time.Duration) (err error) {
+ // limit context to startupTimeout
+ ctx, cancelContext := context.WithTimeout(ctx, timeout)
+ defer cancelContext()
+
+ ipAddress, err := target.Host(ctx)
+ if err != nil {
+ return
+ }
+
+ var waitInterval = 100 * time.Millisecond
+
+ port, err := findMappedPort(ctx, target, waitPort)
+
+ proto := port.Proto()
+ portNumber := port.Int()
+ portString := strconv.Itoa(portNumber)
+
+ // external check
+ dialer := net.Dialer{}
+ address := net.JoinHostPort(ipAddress, portString)
+ for {
+ conn, err := dialer.DialContext(ctx, proto, address)
+ if err != nil {
+ if v, ok := err.(*net.OpError); ok {
+ if v2, ok := (v.Err).(*os.SyscallError); ok {
+ if isConnRefusedErr(v2.Err) {
+ time.Sleep(waitInterval)
+ continue
+ }
+ }
+ }
+ return err
+ }
+ conn.Close()
+ break
+ }
+
+ // internal check
+ command := buildInternalCheckCommand(waitPort.Int())
+ for {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
+ exitCode, err := target.Exec(ctx, []string{"/bin/sh", "-c", command})
+ if err != nil {
+ return err
+ }
+
+ if exitCode == 0 {
+ break
+ } else if exitCode == 126 {
+ return errors.New("/bin/sh command not executable")
+ }
+ }
+
+ return nil
+}
+
+func findMappedPort(ctx context.Context, target wait.StrategyTarget, waitPort nat.Port) (nat.Port, error) {
+ var waitInterval = 100 * time.Millisecond
+
+ var port nat.Port
+ port, err := target.MappedPort(ctx, waitPort)
+ var i = 0
+
+ for port == "" {
+ i++
+
+ select {
+ case <-ctx.Done():
+ return "", fmt.Errorf("%s:%w", ctx.Err(), err)
+ case <-time.After(waitInterval):
+ port, err = target.MappedPort(ctx, waitPort)
+ if err != nil {
+ fmt.Printf("(%d) [%s] %s\n", i, port, err)
+ }
+ }
+ }
+ return port, err
+}
+
+func isConnRefusedErr(err error) bool {
+ return err == syscall.ECONNREFUSED
+}
+
+func buildInternalCheckCommand(internalPort int) string {
+ command := `(
+ cat /proc/net/tcp* | awk '{print $2}' | grep -i :%04x ||
+ nc -vz -w 1 localhost %d ||
+ /bin/sh -c '</dev/tcp/localhost/%d'
+ )
+ `
+ return "true && " + fmt.Sprintf(command, internalPort, internalPort, internalPort)
+}