You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/08/22 17:22:44 UTC

[beam] branch master updated: [prism] support single external env pipelines. (#28083)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a421c4356a [prism] support single external env pipelines. (#28083)
7a421c4356a is described below

commit 7a421c4356a4326461f78231630575666598a8ff
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Aug 22 10:22:36 2023 -0700

    [prism] support single external env pipelines. (#28083)
    
    * [prism] support single external env pipelines.
    
    * provide clear error message
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index c94a98c1315..f8b6b6f33ab 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) {
 	// environments, and start up docker containers, but
 	// here, we only want and need the go one, operating
 	// in loopback mode.
-	env := "go"
+	envs := j.Pipeline.GetComponents().GetEnvironments()
+	if len(envs) != 1 {
+		j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
+		return
+	}
+	env, _ := getOnlyPair(envs)
 	wk := worker.New(env) // Cheating by having the worker id match the environment id.
 	go wk.Serve()
 
@@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
 	return makeWindowCoders(coders[wcID])
 }
 
-func getOnlyValue[K comparable, V any](in map[K]V) V {
+func getOnlyPair[K comparable, V any](in map[K]V) (K, V) {
 	if len(in) != 1 {
 		panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
 	}
-	for _, v := range in {
-		return v
+	for k, v := range in {
+		return k, v
 	}
 	panic("unreachable")
 }
+
+func getOnlyValue[K comparable, V any](in map[K]V) V {
+	_, v := getOnlyPair(in)
+	return v
+}