You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/08/26 00:01:11 UTC

[beam] branch master updated: connect err (#28170)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e26735d69f0 connect err (#28170)
e26735d69f0 is described below

commit e26735d69f0935c6f7ac14dd0fb0e58ff390392a
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Aug 25 17:01:03 2023 -0700

    connect err (#28170)
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index e9c898699c7..49676710343 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -263,7 +263,9 @@ func executePipeline(ctx context.Context, wk *worker.W, j *jobservices.Job) erro
 			wk.Descriptors[stage.ID] = stage.desc
 		case wk.ID:
 			// Great! this is for this environment. // Broken abstraction.
-			buildDescriptor(stage, comps, wk)
+			if err := buildDescriptor(stage, comps, wk); err != nil {
+				return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
+			}
 			stages[stage.ID] = stage
 			slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
 			outputs := maps.Keys(stage.OutputsToCoders)