You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/26 23:14:54 UTC
[beam] branch master updated: [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 92b8dc75286 [BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)
92b8dc75286 is described below
commit 92b8dc75286268f85da59d436ef50e5913dbe9e5
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 26 19:14:47 2022 -0400
[BEAM-14505] Add Dataflow streaming pipeline update support to the Go SDK (#17747)
---
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 17 +++++
sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 78 ++++++++++++++++++++++
.../beam/runners/dataflow/dataflowlib/execute.go | 2 +-
.../pkg/beam/runners/dataflow/dataflowlib/job.go | 18 ++++-
4 files changed, 111 insertions(+), 4 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index eff46bca3f3..82d869ac5d7 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -78,6 +78,10 @@ var (
// TODO(BEAM-14512) Turn this on once TO_STRING is implemented
// enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).")
+ // Streaming update flags
+ update = flag.Bool("update", false, "Submit this job as an update to an existing Dataflow job (optional); the job name must match the existing job to update")
+ transformMapping = flag.String("transform_name_mapping", "", "JSON-formatted mapping of old transform names to new transform names for pipeline updates (optional)")
+
dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.")
teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")
@@ -119,6 +123,8 @@ var flagFilter = map[string]bool{
"teardown_policy": true,
"cpu_profiling": true,
"session_recording": true,
+ "update": true,
+ "transform_name_mapping": true,
// Job Options flags
"endpoint": true,
@@ -256,6 +262,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
}
}
+ if !*update && *transformMapping != "" {
+ return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
+ }
+ var updateTransformMapping map[string]string
+ if *transformMapping != "" {
+ if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil {
+ return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON")
+ }
+ }
hooks.SerializeHooksToOptions()
@@ -317,6 +332,8 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
WorkerZone: *workerZone,
TeardownPolicy: *teardownPolicy,
ContainerImage: getContainerImage(ctx),
+ Update: *update,
+ TransformNameMapping: updateTransformMapping,
}
if opts.TempLocation == "" {
opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index f536fc917ca..620426587e8 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -197,3 +197,81 @@ func TestGetJobOptions_DockerNoImage(t *testing.T) {
t.Fatalf("getContainerImage() = %q, want %q", got, want)
}
}
+
+func TestGetJobOptions_TransformMapping(t *testing.T) {
+ *labels = `{"label1": "val1", "label2": "val2"}`
+ *stagingLocation = "gs://testStagingLocation"
+ *autoscalingAlgorithm = "NONE"
+ *minCPUPlatform = "testPlatform"
+ *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+ *gcpopts.Project = "testProject"
+ *gcpopts.Region = "testRegion"
+
+ *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+ *jobopts.JobName = "testJob"
+
+ *update = true
+ *transformMapping = `{"transformOne": "transformTwo"}`
+ opts, err := getJobOptions(context.Background())
+ if err != nil {
+ t.Errorf("getJobOptions() returned error, got %v", err)
+ }
+ if opts == nil {
+ t.Fatal("getJobOptions() got nil, want struct")
+ }
+ if got, ok := opts.TransformNameMapping["transformOne"]; !ok || got != "transformTwo" {
+ t.Errorf("mismatch in transform mapping got %v, want %v", got, "transformTwo")
+ }
+
+}
+
+func TestGetJobOptions_TransformMappingNoUpdate(t *testing.T) {
+ *labels = `{"label1": "val1", "label2": "val2"}`
+ *stagingLocation = "gs://testStagingLocation"
+ *autoscalingAlgorithm = "NONE"
+ *minCPUPlatform = "testPlatform"
+ *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+ *gcpopts.Project = "testProject"
+ *gcpopts.Region = "testRegion"
+
+ *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+ *jobopts.JobName = "testJob"
+
+ *update = false
+ *transformMapping = `{"transformOne": "transformTwo"}`
+
+ opts, err := getJobOptions(context.Background())
+ if err == nil {
+ t.Error("getJobOptions() returned error nil, want an error")
+ }
+ if opts != nil {
+ t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
+ }
+}
+
+func TestGetJobOptions_InvalidMapping(t *testing.T) {
+ *labels = `{"label1": "val1", "label2": "val2"}`
+ *stagingLocation = "gs://testStagingLocation"
+ *autoscalingAlgorithm = "NONE"
+ *minCPUPlatform = "testPlatform"
+ *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+
+ *gcpopts.Project = "testProject"
+ *gcpopts.Region = "testRegion"
+
+ *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+ *jobopts.JobName = "testJob"
+
+ *update = true
+ *transformMapping = "not a JSON-encoded string"
+
+ opts, err := getJobOptions(context.Background())
+ if err == nil {
+ t.Error("getJobOptions() returned error nil, want an error")
+ }
+ if opts != nil {
+ t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index abc4db75145..ba81147a183 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -109,7 +109,7 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
if err != nil {
return presult, err
}
- upd, err := Submit(ctx, client, opts.Project, opts.Region, job)
+ upd, err := Submit(ctx, client, opts.Project, opts.Region, job, opts.Update)
// When in async mode, if we get a 409 because we've already submitted an actively running job with the same name
// just return the existing job as a convenience
if gErr, ok := err.(*googleapi.Error); async && ok && gErr.Code == 409 {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index fd24729ff9e..e1fa51f7425 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -66,6 +66,10 @@ type JobOptions struct {
FlexRSGoal string
EnableHotKeyLogging bool
+ // Streaming update settings
+ Update bool
+ TransformNameMapping map[string]string
+
// Autoscaling settings
Algorithm string
MaxNumWorkers int64
@@ -208,8 +212,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
TempStoragePrefix: opts.TempLocation,
Experiments: experiments,
},
- Labels: opts.Labels,
- Steps: steps,
+ Labels: opts.Labels,
+ TransformNameMapping: opts.TransformNameMapping,
+ Steps: steps,
}
workerPool := job.Environment.WorkerPools[0]
@@ -238,7 +243,14 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
}
// Submit submits a prepared job to Cloud Dataflow.
-func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job) (*df.Job, error) {
+func Submit(ctx context.Context, client *df.Service, project, region string, job *df.Job, updateJob bool) (*df.Job, error) {
+ if updateJob {
+ runningJob, err := GetRunningJobByName(client, project, region, job.Name)
+ if err != nil {
+ return nil, err
+ }
+ job.ReplaceJobId = runningJob.Id
+ }
upd, err := client.Projects.Locations.Jobs.Create(project, region, job).Do()
if err == nil {
log.Infof(ctx, "Submitted job: %v", upd.Id)