You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/08/22 17:22:44 UTC
[beam] branch master updated: [prism] support single external env pipelines. (#28083)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7a421c4356a [prism] support single external env pipelines. (#28083)
7a421c4356a is described below
commit 7a421c4356a4326461f78231630575666598a8ff
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue Aug 22 10:22:36 2023 -0700
[prism] support single external env pipelines. (#28083)
* [prism] support single external env pipelines.
* provide clear error message
---------
Co-authored-by: lostluck <13...@users.noreply.github.com>
---
sdks/go/pkg/beam/runners/prism/internal/execute.go | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index c94a98c1315..f8b6b6f33ab 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -47,7 +47,12 @@ func RunPipeline(j *jobservices.Job) {
// environments, and start up docker containers, but
// here, we only want and need the go one, operating
// in loopback mode.
- env := "go"
+ envs := j.Pipeline.GetComponents().GetEnvironments()
+ if len(envs) != 1 {
+ j.Failed(fmt.Errorf("unable to execute multi-environment pipelines;\npipeline has environments: %+v", envs))
+ return
+ }
+ env, _ := getOnlyPair(envs)
wk := worker.New(env) // Cheating by having the worker id match the environment id.
go wk.Serve()
@@ -302,12 +307,17 @@ func getWindowValueCoders(comps *pipepb.Components, col *pipepb.PCollection, cod
return makeWindowCoders(coders[wcID])
}
-func getOnlyValue[K comparable, V any](in map[K]V) V {
+func getOnlyPair[K comparable, V any](in map[K]V) (K, V) {
if len(in) != 1 {
panic(fmt.Sprintf("expected single value map, had %v - %v", len(in), in))
}
- for _, v := range in {
- return v
+ for k, v := range in {
+ return k, v
}
panic("unreachable")
}
+
+func getOnlyValue[K comparable, V any](in map[K]V) V {
+ _, v := getOnlyPair(in)
+ return v
+}