You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/02/14 21:58:32 UTC

[beam] branch master updated: [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 934b141  [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
934b141 is described below

commit 934b1416c633be6385ff85c8228d3fc85de04a33
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Feb 14 16:56:29 2022 -0500

    [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
---
 sdks/go/container/boot.go      | 115 +++++++++++++----------
 sdks/go/container/boot_test.go | 205 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 270 insertions(+), 50 deletions(-)

diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 49d9165..93311e5 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -17,7 +17,9 @@ package main
 
 import (
 	"context"
+	"errors"
 	"flag"
+	"fmt"
 	"io"
 	"log"
 	"os"
@@ -25,6 +27,8 @@ import (
 	"strings"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -58,25 +62,9 @@ func main() {
 	}
 	log.Printf("Provision info:\n%v", info)
 
-	// TODO(BEAM-8201): Simplify once flags are no longer used.
-	if info.GetLoggingEndpoint().GetUrl() != "" {
-		*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
-	}
-	if info.GetArtifactEndpoint().GetUrl() != "" {
-		*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
-	}
-	if info.GetControlEndpoint().GetUrl() != "" {
-		*controlEndpoint = info.GetControlEndpoint().GetUrl()
-	}
-
-	if *loggingEndpoint == "" {
-		log.Fatal("No logging endpoint provided.")
-	}
-	if *artifactEndpoint == "" {
-		log.Fatal("No artifact endpoint provided.")
-	}
-	if *controlEndpoint == "" {
-		log.Fatal("No control endpoint provided.")
+	err = ensureEndpointsSet(info)
+	if err != nil {
+		log.Fatalf("Endpoint not set: %v", err)
 	}
 	log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
 
@@ -99,60 +87,87 @@ func main() {
 		log.Fatalf("Failed to retrieve staged files: %v", err)
 	}
 
-	// TODO(BEAM-13647): Remove legacy hack once aged out.
+	name, err := getGoWorkerArtifactName(artifacts)
+	if err != nil {
+		log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+	}
+
+	// (3) The persist dir may be on a noexec volume, so we must
+	// copy the binary to a different location to execute.
+	const prog = "/bin/worker"
+	if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+		log.Fatalf("Failed to copy worker binary: %v", err)
+	}
+
+	args := []string{
+		"--worker=true",
+		"--id=" + *id,
+		"--logging_endpoint=" + *loggingEndpoint,
+		"--control_endpoint=" + *controlEndpoint,
+		"--semi_persist_dir=" + *semiPersistDir,
+		"--options=" + options,
+	}
+	if info.GetStatusEndpoint() != nil {
+		args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+	}
+
+	log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+}
+
+func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
 	const worker = "worker"
 	name := worker
 
 	switch len(artifacts) {
 	case 0:
-		log.Fatal("No artifacts staged")
+		return "", errors.New(fmt.Sprintf("No artifacts staged"))
 	case 1:
 		name, _ = artifact.MustExtractFilePayload(artifacts[0])
+		return name, nil
 	default:
-		found := false
 		for _, a := range artifacts {
 			if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole {
 				name, _ = artifact.MustExtractFilePayload(a)
-				found = true
-				break
+				return name, nil
 			}
 		}
 		// TODO(BEAM-13647): Remove legacy hack once aged out.
-		if !found {
-			for _, a := range artifacts {
-				n, _ := artifact.MustExtractFilePayload(a)
-				if n == worker {
-					found = true
-					log.Printf("Go worker binary found with legacy name '%v' found", worker)
-					break
-				}
+		for _, a := range artifacts {
+			n, _ := artifact.MustExtractFilePayload(a)
+			if n == worker {
+				log.Printf("Go worker binary found with legacy name '%v'", worker)
+				return n, nil
 			}
 		}
-		if !found {
-			log.Fatalf("No artifact named '%v' found", worker)
-		}
+		return "", errors.New(fmt.Sprintf("No artifact named '%v' found", worker))
 	}
 
-	// (3) The persist dir may be on a noexec volume, so we must
-	// copy the binary to a different location to execute.
-	const prog = "/bin/worker"
-	if err := copyExe(filepath.Join(dir, name), prog); err != nil {
-		log.Fatalf("Failed to copy worker binary: %v", err)
+	return name, nil
+}
+
+func ensureEndpointsSet(info *fnpb.ProvisionInfo) error {
+	// TODO(BEAM-8201): Simplify once flags are no longer used.
+	if info.GetLoggingEndpoint().GetUrl() != "" {
+		*loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
+	}
+	if info.GetArtifactEndpoint().GetUrl() != "" {
+		*artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
+	}
+	if info.GetControlEndpoint().GetUrl() != "" {
+		*controlEndpoint = info.GetControlEndpoint().GetUrl()
 	}
 
-	args := []string{
-		"--worker=true",
-		"--id=" + *id,
-		"--logging_endpoint=" + *loggingEndpoint,
-		"--control_endpoint=" + *controlEndpoint,
-		"--semi_persist_dir=" + *semiPersistDir,
-		"--options=" + options,
+	if *loggingEndpoint == "" {
+		return errors.New("No logging endpoint provided.")
 	}
-	if info.GetStatusEndpoint() != nil {
-		args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+	if *artifactEndpoint == "" {
+		return errors.New("No artifact endpoint provided.")
+	}
+	if *controlEndpoint == "" {
+		return errors.New("No control endpoint provided.")
 	}
 
-	log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+	return nil
 }
 
 func copyExe(from, to string) error {
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
new file mode 100644
index 0000000..7eda752
--- /dev/null
+++ b/sdks/go/container/boot_test.go
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+	fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/golang/protobuf/proto"
+)
+
+func TestEnsureEndpointsSet_AllSet(t *testing.T) {
+	provisionInfo := &fnpb.ProvisionInfo{
+		LoggingEndpoint:  &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
+		ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
+		ControlEndpoint:  &pipepb.ApiServiceDescriptor{Url: "testControlEndpointUrl"},
+	}
+	*loggingEndpoint = ""
+	*artifactEndpoint = ""
+	*controlEndpoint = ""
+	err := ensureEndpointsSet(provisionInfo)
+	if err != nil {
+		t.Fatalf("ensureEndpointsSet() = %q, want nil", err)
+	}
+	if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, want %q", got, want)
+	}
+	if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, want %q", got, want)
+	}
+	if got, want := *controlEndpoint, "testControlEndpointUrl"; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, want %q", got, want)
+	}
+}
+
+func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
+	provisionInfo := &fnpb.ProvisionInfo{
+		LoggingEndpoint:  &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
+		ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
+		ControlEndpoint:  &pipepb.ApiServiceDescriptor{Url: ""},
+	}
+	*loggingEndpoint = ""
+	*artifactEndpoint = ""
+	*controlEndpoint = ""
+	err := ensureEndpointsSet(provisionInfo)
+	if err == nil {
+		t.Fatalf("ensureEndpointsSet() = nil, want non-nil error")
+	}
+	if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, want %q", got, want)
+	}
+	if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, want %q", got, want)
+	}
+	if got, want := *controlEndpoint, ""; got != want {
+		t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, want %q", got, want)
+	}
+}
+
+func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {
+	_, err := getGoWorkerArtifactName([]*pipepb.ArtifactInformation{})
+	if err == nil {
+		t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+	}
+}
+
+func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
+	artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+	artifacts := []*pipepb.ArtifactInformation{&artifact}
+
+	val, err := getGoWorkerArtifactName(artifacts)
+	if err != nil {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+	}
+	if got, want := val, "test/path"; got != want {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+	}
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
+	artifact1 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+	artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
+	artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+	val, err := getGoWorkerArtifactName(artifacts)
+	if err != nil {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+	}
+	if got, want := val, "test/path"; got != want {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+	}
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
+	artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+	artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
+	artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+	val, err := getGoWorkerArtifactName(artifacts)
+	if err != nil {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+	}
+	if got, want := val, "test/path2"; got != want {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+	}
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
+	artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+	artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
+	artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+	val, err := getGoWorkerArtifactName(artifacts)
+	if err != nil {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+	}
+	if got, want := val, "worker"; got != want {
+		t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+	}
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
+	artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+	artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
+	artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+	_, err := getGoWorkerArtifactName(artifacts)
+	if err == nil {
+		t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+	}
+}
+
+func TestCopyExe(t *testing.T) {
+	testExeContent := []byte("testContent")
+
+	// Make temp directory to cleanup at the end
+	d, err := ioutil.TempDir(os.Getenv("TEST_TMPDIR"), "copyExe-*")
+	if err != nil {
+		t.Fatalf("failed to make temp directory, got %v", err)
+	}
+	t.Cleanup(func() { os.RemoveAll(d) })
+
+	// Make our source file and write to it
+	src, err := ioutil.TempFile(d, "src.exe")
+	if err != nil {
+		t.Fatalf("failed to make temp file, got %v", err)
+	}
+	if _, err := src.Write(testExeContent); err != nil {
+		t.Fatalf("failed to write to temp file, got %v", err)
+	}
+	if err := src.Close(); err != nil {
+		t.Fatalf("failed to close temp file, got %v", err)
+	}
+
+	// Make sure our destination path doesn't exist already
+	srcPath, destPath := src.Name(), filepath.Join(d, "dest.exe")
+	if _, err := os.Stat(destPath); err == nil {
+		t.Fatalf("dest file %v already exists", destPath)
+	}
+
+	err = copyExe(srcPath, destPath)
+	if err != nil {
+		t.Fatalf("copyExe() = %v, want nil", err)
+	}
+	if _, err := os.Stat(destPath); err != nil {
+		t.Fatalf("After running copyExe, os.Stat() = %v, want nil", err)
+	}
+	destContents, err := ioutil.ReadFile(destPath)
+	if err != nil {
+		t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want nil", err)
+	}
+	if got, want := string(destContents), string(testExeContent); got != want {
+		t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want %v", got, want)
+	}
+}
+
+func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) pipepb.ArtifactInformation {
+	t.Helper()
+
+	typePayload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{Path: path, Sha256: sha})
+
+	return pipepb.ArtifactInformation{
+		RoleUrn:     roleUrn,
+		TypeUrn:     artifact.URNFileArtifact,
+		TypePayload: typePayload,
+	}
+}