You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/02/18 21:26:19 UTC

[GitHub] [beam] damccorm commented on a change in pull request #16903: [BEAM-13912] Add more coverage for dataflow.go

damccorm commented on a change in pull request #16903:
URL: https://github.com/apache/beam/pull/16903#discussion_r810349831



##########
File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
##########
@@ -159,8 +159,59 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
 		panic("Beam has not been initialized. Call beam.Init() before pipeline construction.")
 	}
 
-	// (1) Gather job options
+	beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
+	opts, err := getJobOptions(ctx)
+	if err != nil {
+		return nil, err
+	}
+
+	// (1) Build and submit
+	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
+	id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
+
+	modelURL := gcsx.Join(*stagingLocation, id, "model")
+	workerURL := gcsx.Join(*stagingLocation, id, "worker")
+	jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+	xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
+
+	edges, _, err := p.Build()
+	if err != nil {
+		return nil, err
+	}
+	artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
+	if err != nil {
+		return nil, errors.WithContext(err, "resolving cross-language artifacts")
+	}
+	opts.ArtifactURLs = artifactURLs
+	environment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
+	if err != nil {
+		return nil, errors.WithContext(err, "creating environment for model pipeline")
+	}
+	model, err := graphx.Marshal(edges, &graphx.Options{Environment: environment})
+	if err != nil {
+		return nil, errors.WithContext(err, "generating model pipeline")
+	}
+	err = pipelinex.ApplySdkImageOverrides(model, jobopts.GetSdkImageOverrides())
+	if err != nil {
+		return nil, errors.WithContext(err, "applying container image overrides")
+	}
+
+	if *dryRun {
+		log.Info(ctx, "Dry-run: not submitting job!")
+
+		log.Info(ctx, proto.MarshalTextString(model))
+		job, err := dataflowlib.Translate(ctx, model, opts, workerURL, jarURL, modelURL)
+		if err != nil {
+			return nil, err
+		}
+		dataflowlib.PrintJob(ctx, job)
+		return nil, nil
+	}
 
+	return dataflowlib.Execute(ctx, model, opts, workerURL, jarURL, modelURL, *endpoint, *executeAsync)
+}
+
+func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {

Review comment:
       This is definitely another L for github's diffs 😢 but basically I just moved a block of code from the top of this green block into this helper function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org