You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/26 23:14:54 UTC

[beam] branch master updated: [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)
92b8dc75286 is described below

commit 92b8dc75286268f85da59d436ef50e5913dbe9e5
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 26 19:14:47 2022 -0400

    [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)
---
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      | 17 +++++
 sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 78 ++++++++++++++++++++++
 .../beam/runners/dataflow/dataflowlib/execute.go   |  2 +-
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   | 18 ++++-
 4 files changed, 111 insertions(+), 4 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index eff46bca3f3..82d869ac5d7 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -78,6 +78,10 @@ var (
 	// TODO(BEAM-14512) Turn this on once TO_STRING is implemented
 	// enableHotKeyLogging    = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).")
 
+	// Streaming update flags
+	update           = flag.Bool("update", false, "Submit this job as an update to an existing Dataflow job (optional); the job name must match the existing job to update")
+	transformMapping = flag.String("transform_name_mapping", "", "JSON-formatted mapping of old transform names to new transform names for pipeline updates (optional)")
+
 	dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.")
 	teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")
 
@@ -119,6 +123,8 @@ var flagFilter = map[string]bool{
 	"teardown_policy":                true,
 	"cpu_profiling":                  true,
 	"session_recording":              true,
+	"update":                         true,
+	"transform_name_mapping":         true,
 
 	// Job Options flags
 	"endpoint":                 true,
@@ -256,6 +262,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
 			return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
 		}
 	}
+	if !*update && *transformMapping != "" {
+		return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
+	}
+	var updateTransformMapping map[string]string
+	if *transformMapping != "" {
+		if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil {
+			return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON")
+		}
+	}
 
 	hooks.SerializeHooksToOptions()
 
@@ -317,6 +332,8 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
 		WorkerZone:             *workerZone,
 		TeardownPolicy:         *teardownPolicy,
 		ContainerImage:         getContainerImage(ctx),
+		Update:                 *update,
+		TransformNameMapping:   updateTransformMapping,
 	}
 	if opts.TempLocation == "" {
 		opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index f536fc917ca..620426587e8 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -197,3 +197,81 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) {
 		t.Fatalf("getContainerImage() = %q, want %q", got, want)
 	}
 }
+
+func TestGetJobOptions_TransformMapping(t *testing.T) {
+	*labels = `{"label1": "val1", "label2": "val2"}`
+	*stagingLocation = "gs://testStagingLocation"
+	*autoscalingAlgorithm = "NONE"
+	*minCPUPlatform = "testPlatform"
+	*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+	*gcpopts.Project = "testProject"
+	*gcpopts.Region = "testRegion"
+
+	*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+	*jobopts.JobName = "testJob"
+
+	*update = true
+	*transformMapping = `{"transformOne": "transformTwo"}`
+	opts, err := getJobOptions(context.Background())
+	if err != nil {
+		t.Errorf("getJobOptions() returned error, got %v", err)
+	}
+	if opts == nil {
+		t.Fatal("getJobOptions() got nil, want struct")
+	}
+	if got, ok := opts.TransformNameMapping["transformOne"]; !ok || got != "transformTwo" {
+		t.Errorf("mismatch in transform mapping got %v, want %v", got, "transformTwo")
+	}
+
+}
+
+func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) {
+	*labels = `{"label1": "val1", "label2": "val2"}`
+	*stagingLocation = "gs://testStagingLocation"
+	*autoscalingAlgorithm = "NONE"
+	*minCPUPlatform = "testPlatform"
+	*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+	*gcpopts.Project = "testProject"
+	*gcpopts.Region = "testRegion"
+
+	*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+	*jobopts.JobName = "testJob"
+
+	*update = false
+	*transformMapping = `{"transformOne": "transformTwo"}`
+
+	opts, err := getJobOptions(context.Background())
+	if err == nil {
+		t.Error("getJobOptions() returned error nil, want an error")
+	}
+	if opts != nil {
+		t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
+	}
+}
+
+func TestGetJobOptions_InvalidMapping(t *testing.T) {
+	*labels = `{"label1": "val1", "label2": "val2"}`
+	*stagingLocation = "gs://testStagingLocation"
+	*autoscalingAlgorithm = "NONE"
+	*minCPUPlatform = "testPlatform"
+	*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+	*gcpopts.Project = "testProject"
+	*gcpopts.Region = "testRegion"
+
+	*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+	*jobopts.JobName = "testJob"
+
+	*update = true
+	*transformMapping = "not a JSON-encoded string"
+
+	opts, err := getJobOptions(context.Background())
+	if err == nil {
+		t.Error("getJobOptions() returned error nil, want an error")
+	}
+	if opts != nil {
+		t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
+	}
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index abc4db75145..ba81147a183 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -109,7 +109,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
 	if err != nil {
 		return presult, err
 	}
-	upd, err := Submit(ctx, client, opts.Project, opts.Region, job)
+	upd, err := Submit(ctx, client, opts.Project, opts.Region, job, opts.Update)
 	// When in async mode, if we get a 409 because we've already submitted an actively running job with the same name
 	// just return the existing job as a convenience
 	if gErr, ok := err.(*googleapi.Error); async && ok && gErr.Code == 409 {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index fd24729ff9e..e1fa51f7425 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -66,6 +66,10 @@ type JobOptions struct {
 	FlexRSGoal          string
 	EnableHotKeyLogging bool
 
+	// Streaming update settings
+	Update               bool
+	TransformNameMapping map[string]string
+
 	// Autoscaling settings
 	Algorithm            string
 	MaxNumWorkers        int64
@@ -208,8 +212,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
 			TempStoragePrefix: opts.TempLocation,
 			Experiments:       experiments,
 		},
-		Labels: opts.Labels,
-		Steps:  steps,
+		Labels:               opts.Labels,
+		TransformNameMapping: opts.TransformNameMapping,
+		Steps:                steps,
 	}
 
 	workerPool := job.Environment.WorkerPools[0]
@@ -238,7 +243,14 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
 }
 
 // Submit submits a prepared job to Cloud Dataflow.
-func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job) (*df.Job, error) {
+func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job, updateJob bool) (*df.Job, error) {
+	if updateJob {
+		runningJob, err := GetRunningJobByName(client, project, region, job.Name)
+		if err != nil {
+			return nil, err
+		}
+		job.ReplaceJobId = runningJob.Id
+	}
 	upd, err := client.Projects.Locations.Jobs.Create(project, region, job).Do()
 	if err == nil {
 		log.Infof(ctx, "Submitted job: %v", upd.Id)