You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/02/14 21:58:32 UTC
[beam] branch master updated: [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 934b141 [BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
934b141 is described below
commit 934b1416c633be6385ff85c8228d3fc85de04a33
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon Feb 14 16:56:29 2022 -0500
[BEAM-13922] [Coverage] Make boot.go more testable and add tests (#16833)
---
sdks/go/container/boot.go | 115 +++++++++++++----------
sdks/go/container/boot_test.go | 205 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 270 insertions(+), 50 deletions(-)
diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go
index 49d9165..93311e5 100644
--- a/sdks/go/container/boot.go
+++ b/sdks/go/container/boot.go
@@ -17,7 +17,9 @@ package main
import (
"context"
+ "errors"
"flag"
+ "fmt"
"io"
"log"
"os"
@@ -25,6 +27,8 @@ import (
"strings"
"github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/provision"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
@@ -58,25 +62,9 @@ func main() {
}
log.Printf("Provision info:\n%v", info)
- // TODO(BEAM-8201): Simplify once flags are no longer used.
- if info.GetLoggingEndpoint().GetUrl() != "" {
- *loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
- }
- if info.GetArtifactEndpoint().GetUrl() != "" {
- *artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
- }
- if info.GetControlEndpoint().GetUrl() != "" {
- *controlEndpoint = info.GetControlEndpoint().GetUrl()
- }
-
- if *loggingEndpoint == "" {
- log.Fatal("No logging endpoint provided.")
- }
- if *artifactEndpoint == "" {
- log.Fatal("No artifact endpoint provided.")
- }
- if *controlEndpoint == "" {
- log.Fatal("No control endpoint provided.")
+ err = ensureEndpointsSet(info)
+ if err != nil {
+ log.Fatalf("Endpoint not set: %v", err)
}
log.Printf("Initializing Go harness: %v", strings.Join(os.Args, " "))
@@ -99,60 +87,87 @@ func main() {
log.Fatalf("Failed to retrieve staged files: %v", err)
}
- // TODO(BEAM-13647): Remove legacy hack once aged out.
+ name, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ log.Fatalf("Failed to get Go Worker Artifact Name: %v", err)
+ }
+
+ // (3) The persist dir may be on a noexec volume, so we must
+ // copy the binary to a different location to execute.
+ const prog = "/bin/worker"
+ if err := copyExe(filepath.Join(dir, name), prog); err != nil {
+ log.Fatalf("Failed to copy worker binary: %v", err)
+ }
+
+ args := []string{
+ "--worker=true",
+ "--id=" + *id,
+ "--logging_endpoint=" + *loggingEndpoint,
+ "--control_endpoint=" + *controlEndpoint,
+ "--semi_persist_dir=" + *semiPersistDir,
+ "--options=" + options,
+ }
+ if info.GetStatusEndpoint() != nil {
+ args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+ }
+
+ log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+}
+
+func getGoWorkerArtifactName(artifacts []*pipepb.ArtifactInformation) (string, error) {
const worker = "worker"
name := worker
switch len(artifacts) {
case 0:
- log.Fatal("No artifacts staged")
+ return "", errors.New(fmt.Sprintf("No artifacts staged"))
case 1:
name, _ = artifact.MustExtractFilePayload(artifacts[0])
+ return name, nil
default:
- found := false
for _, a := range artifacts {
if a.GetRoleUrn() == artifact.URNGoWorkerBinaryRole {
name, _ = artifact.MustExtractFilePayload(a)
- found = true
- break
+ return name, nil
}
}
// TODO(BEAM-13647): Remove legacy hack once aged out.
- if !found {
- for _, a := range artifacts {
- n, _ := artifact.MustExtractFilePayload(a)
- if n == worker {
- found = true
- log.Printf("Go worker binary found with legacy name '%v' found", worker)
- break
- }
+ for _, a := range artifacts {
+ n, _ := artifact.MustExtractFilePayload(a)
+ if n == worker {
+ log.Printf("Go worker binary found with legacy name '%v'", worker)
+ return n, nil
}
}
- if !found {
- log.Fatalf("No artifact named '%v' found", worker)
- }
+ return "", errors.New(fmt.Sprintf("No artifact named '%v' found", worker))
}
- // (3) The persist dir may be on a noexec volume, so we must
- // copy the binary to a different location to execute.
- const prog = "/bin/worker"
- if err := copyExe(filepath.Join(dir, name), prog); err != nil {
- log.Fatalf("Failed to copy worker binary: %v", err)
+ return name, nil
+}
+
+func ensureEndpointsSet(info *fnpb.ProvisionInfo) error {
+ // TODO(BEAM-8201): Simplify once flags are no longer used.
+ if info.GetLoggingEndpoint().GetUrl() != "" {
+ *loggingEndpoint = info.GetLoggingEndpoint().GetUrl()
+ }
+ if info.GetArtifactEndpoint().GetUrl() != "" {
+ *artifactEndpoint = info.GetArtifactEndpoint().GetUrl()
+ }
+ if info.GetControlEndpoint().GetUrl() != "" {
+ *controlEndpoint = info.GetControlEndpoint().GetUrl()
}
- args := []string{
- "--worker=true",
- "--id=" + *id,
- "--logging_endpoint=" + *loggingEndpoint,
- "--control_endpoint=" + *controlEndpoint,
- "--semi_persist_dir=" + *semiPersistDir,
- "--options=" + options,
+ if *loggingEndpoint == "" {
+ return errors.New("No logging endpoint provided.")
}
- if info.GetStatusEndpoint() != nil {
- args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
+ if *artifactEndpoint == "" {
+ return errors.New("No artifact endpoint provided.")
+ }
+ if *controlEndpoint == "" {
+ return errors.New("No control endpoint provided.")
}
- log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
+ return nil
}
func copyExe(from, to string) error {
diff --git a/sdks/go/container/boot_test.go b/sdks/go/container/boot_test.go
new file mode 100644
index 0000000..7eda752
--- /dev/null
+++ b/sdks/go/container/boot_test.go
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/artifact"
+ fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+)
+
+func TestEnsureEndpointsSet_AllSet(t *testing.T) {
+ provisionInfo := &fnpb.ProvisionInfo{
+ LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
+ ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
+ ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: "testControlEndpointUrl"},
+ }
+ *loggingEndpoint = ""
+ *artifactEndpoint = ""
+ *controlEndpoint = ""
+ err := ensureEndpointsSet(provisionInfo)
+ if err != nil {
+ t.Fatalf("ensureEndpointsSet() = %q, want nil", err)
+ }
+ if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, want %q", got, want)
+ }
+ if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, want %q", got, want)
+ }
+ if got, want := *controlEndpoint, "testControlEndpointUrl"; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, want %q", got, want)
+ }
+}
+
+func TestEnsureEndpointsSet_OneMissing(t *testing.T) {
+ provisionInfo := &fnpb.ProvisionInfo{
+ LoggingEndpoint: &pipepb.ApiServiceDescriptor{Url: "testLoggingEndpointUrl"},
+ ArtifactEndpoint: &pipepb.ApiServiceDescriptor{Url: "testArtifactEndpointUrl"},
+ ControlEndpoint: &pipepb.ApiServiceDescriptor{Url: ""},
+ }
+ *loggingEndpoint = ""
+ *artifactEndpoint = ""
+ *controlEndpoint = ""
+ err := ensureEndpointsSet(provisionInfo)
+ if err == nil {
+ t.Fatalf("ensureEndpointsSet() = nil, want non-nil error")
+ }
+ if got, want := *loggingEndpoint, "testLoggingEndpointUrl"; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *loggingEndpoint = %q, want %q", got, want)
+ }
+ if got, want := *artifactEndpoint, "testArtifactEndpointUrl"; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *artifactEndpoint = %q, want %q", got, want)
+ }
+ if got, want := *controlEndpoint, ""; got != want {
+ t.Fatalf("After ensureEndpointsSet(), *controlEndpoint = %q, want %q", got, want)
+ }
+}
+
+func TestGetGoWorkerArtifactName_NoArtifacts(t *testing.T) {
+ _, err := getGoWorkerArtifactName([]*pipepb.ArtifactInformation{})
+ if err == nil {
+ t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+ }
+}
+
+func TestGetGoWorkerArtifactName_OneArtifact(t *testing.T) {
+ artifact := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+ artifacts := []*pipepb.ArtifactInformation{&artifact}
+
+ val, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+ }
+ if got, want := val, "test/path"; got != want {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+ }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsFirstIsWorker(t *testing.T) {
+ artifact1 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path", "sha")
+ artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
+ artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+ val, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+ }
+ if got, want := val, "test/path"; got != want {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+ }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsSecondIsWorker(t *testing.T) {
+ artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+ artifact2 := constructArtifactInformation(t, artifact.URNGoWorkerBinaryRole, "test/path2", "sha")
+ artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+ val, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+ }
+ if got, want := val, "test/path2"; got != want {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+ }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsLegacyWay(t *testing.T) {
+ artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+ artifact2 := constructArtifactInformation(t, "other role", "worker", "sha")
+ artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+ val, err := getGoWorkerArtifactName(artifacts)
+ if err != nil {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want nil", err)
+ }
+ if got, want := val, "worker"; got != want {
+ t.Fatalf("getGoWorkerArtifactName() = %v, want %v", got, want)
+ }
+}
+
+func TestGetGoWorkerArtifactName_MultipleArtifactsNoneMatch(t *testing.T) {
+ artifact1 := constructArtifactInformation(t, "other role", "test/path", "sha")
+ artifact2 := constructArtifactInformation(t, "other role", "test/path2", "sha")
+ artifacts := []*pipepb.ArtifactInformation{&artifact1, &artifact2}
+
+ _, err := getGoWorkerArtifactName(artifacts)
+ if err == nil {
+ t.Fatalf("getGoWorkerArtifactName() = nil, want non-nil error")
+ }
+}
+
+func TestCopyExe(t *testing.T) {
+ testExeContent := []byte("testContent")
+
+ // Make temp directory to cleanup at the end
+ d, err := ioutil.TempDir(os.Getenv("TEST_TMPDIR"), "copyExe-*")
+ if err != nil {
+ t.Fatalf("failed to make temp directory, got %v", err)
+ }
+ t.Cleanup(func() { os.RemoveAll(d) })
+
+ // Make our source file and write to it
+ src, err := ioutil.TempFile(d, "src.exe")
+ if err != nil {
+ t.Fatalf("failed to make temp file, got %v", err)
+ }
+ if _, err := src.Write(testExeContent); err != nil {
+ t.Fatalf("failed to write to temp file, got %v", err)
+ }
+ if err := src.Close(); err != nil {
+ t.Fatalf("failed to close temp file, got %v", err)
+ }
+
+ // Make sure our destination path doesn't exist already
+ srcPath, destPath := src.Name(), filepath.Join(d, "dest.exe")
+ if _, err := os.Stat(destPath); err == nil {
+ t.Fatalf("dest file %v already exists", destPath)
+ }
+
+ err = copyExe(srcPath, destPath)
+ if err != nil {
+ t.Fatalf("copyExe() = %v, want nil", err)
+ }
+ if _, err := os.Stat(destPath); err != nil {
+ t.Fatalf("After running copyExe, os.Stat() = %v, want nil", err)
+ }
+ destContents, err := ioutil.ReadFile(destPath)
+ if err != nil {
+ t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want nil", err)
+ }
+ if got, want := string(destContents), string(testExeContent); got != want {
+ t.Fatalf("After running copyExe, ioutil.ReadFile() = %v, want %v", got, want)
+ }
+}
+
+func constructArtifactInformation(t *testing.T, roleUrn string, path string, sha string) pipepb.ArtifactInformation {
+ t.Helper()
+
+ typePayload, _ := proto.Marshal(&pipepb.ArtifactFilePayload{Path: path, Sha256: sha})
+
+ return pipepb.ArtifactInformation{
+ RoleUrn: roleUrn,
+ TypeUrn: artifact.URNFileArtifact,
+ TypePayload: typePayload,
+ }
+}