You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/25 19:23:21 UTC

[beam] branch master updated: [BEAM-14509] Add several flags to dataflow runner (#17752)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2039b39f96f [BEAM-14509] Add several flags to dataflow runner (#17752)
2039b39f96f is described below

commit 2039b39f96ff96e0899a830e560c02c6c8e17acb
Author: Danny McCormick <da...@google.com>
AuthorDate: Wed May 25 15:23:08 2022 -0400

    [BEAM-14509] Add several flags to dataflow runner (#17752)
---
 sdks/go/pkg/beam/runners/dataflow/dataflow.go      | 112 +++++++++++++--------
 sdks/go/pkg/beam/runners/dataflow/dataflow_test.go |  39 ++++++-
 .../pkg/beam/runners/dataflow/dataflowlib/job.go   |  21 +++-
 3 files changed, 123 insertions(+), 49 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 33b4b0fc66d..eff46bca3f3 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -51,26 +51,32 @@ import (
 // TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
 
 var (
-	endpoint             = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).")
-	stagingLocation      = flag.String("staging_location", "", "GCS staging location (required).")
-	image                = flag.String("worker_harness_container_image", "", "Worker harness container image (required).")
-	labels               = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).")
-	serviceAccountEmail  = flag.String("service_account_email", "", "Service account email (optional).")
-	numWorkers           = flag.Int64("num_workers", 0, "Number of workers (optional).")
-	maxNumWorkers        = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
-	diskSizeGb           = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).")
-	diskType             = flag.String("disk_type", "", "Type of root disk for VMs (optional).")
-	autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
-	zone                 = flag.String("zone", "", "GCP zone (optional)")
-	network              = flag.String("network", "", "GCP network (optional)")
-	subnetwork           = flag.String("subnetwork", "", "GCP subnetwork (optional)")
-	noUsePublicIPs       = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)")
-	tempLocation         = flag.String("temp_location", "", "Temp location (optional)")
-	machineType          = flag.String("worker_machine_type", "", "GCE machine type (optional)")
-	minCPUPlatform       = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
-	workerJar            = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
-	workerRegion         = flag.String("worker_region", "", "Dataflow worker region (optional)")
-	workerZone           = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
+	endpoint               = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).")
+	stagingLocation        = flag.String("staging_location", "", "GCS staging location (required).")
+	image                  = flag.String("worker_harness_container_image", "", "Worker harness container image (optional).")
+	labels                 = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).")
+	serviceAccountEmail    = flag.String("service_account_email", "", "Service account email (optional).")
+	numWorkers             = flag.Int64("num_workers", 0, "Number of workers (optional).")
+	workerHarnessThreads   = flag.Int64("number_of_worker_harness_threads", 0, "The number of threads per each worker harness process (optional).")
+	maxNumWorkers          = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
+	diskSizeGb             = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).")
+	diskType               = flag.String("disk_type", "", "Type of root disk for VMs (optional).")
+	autoscalingAlgorithm   = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
+	zone                   = flag.String("zone", "", "GCP zone (optional)")
+	kmsKey                 = flag.String("dataflow_kms_key", "", "The Cloud KMS key identifier used to encrypt data at rest (optional).")
+	network                = flag.String("network", "", "GCP network (optional)")
+	subnetwork             = flag.String("subnetwork", "", "GCP subnetwork (optional)")
+	noUsePublicIPs         = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)")
+	tempLocation           = flag.String("temp_location", "", "Temp location (optional)")
+	machineType            = flag.String("worker_machine_type", "", "GCE machine type (optional)")
+	minCPUPlatform         = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
+	workerJar              = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
+	workerRegion           = flag.String("worker_region", "", "Dataflow worker region (optional)")
+	workerZone             = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
+	dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)")
+	flexRSGoal             = flag.String("flexrs_goal", "", "Which Flexible Resource Scheduling mode to run in (optional)")
+	// TODO(BEAM-14512) Turn this on once TO_STRING is implemented
+	// enableHotKeyLogging    = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).")
 
 	dryRun         = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.")
 	teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")
@@ -242,6 +248,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
 		}
 	}
 
+	if *flexRSGoal != "" {
+		switch *flexRSGoal {
+		case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", "FLEXRS_COST_OPTIMIZED":
+			// valid values
+		default:
+			return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
+		}
+	}
+
 	hooks.SerializeHooksToOptions()
 
 	experiments := jobopts.GetExperiments()
@@ -267,32 +282,41 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
 		experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
 	}
 
+	var dfServiceOptions []string
+	if *dataflowServiceOptions != "" {
+		dfServiceOptions = strings.Split(*dataflowServiceOptions, ",")
+	}
+
 	beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
 	opts := &dataflowlib.JobOptions{
-		Name:                jobopts.GetJobName(),
-		Experiments:         experiments,
-		Options:             beam.PipelineOptions.Export(),
-		Project:             project,
-		Region:              region,
-		Zone:                *zone,
-		Network:             *network,
-		Subnetwork:          *subnetwork,
-		NoUsePublicIPs:      *noUsePublicIPs,
-		NumWorkers:          *numWorkers,
-		MaxNumWorkers:       *maxNumWorkers,
-		DiskSizeGb:          *diskSizeGb,
-		DiskType:            *diskType,
-		Algorithm:           *autoscalingAlgorithm,
-		MachineType:         *machineType,
-		Labels:              jobLabels,
-		ServiceAccountEmail: *serviceAccountEmail,
-		TempLocation:        *tempLocation,
-		Worker:              *jobopts.WorkerBinary,
-		WorkerJar:           *workerJar,
-		WorkerRegion:        *workerRegion,
-		WorkerZone:          *workerZone,
-		TeardownPolicy:      *teardownPolicy,
-		ContainerImage:      getContainerImage(ctx),
+		Name:                   jobopts.GetJobName(),
+		Experiments:            experiments,
+		DataflowServiceOptions: dfServiceOptions,
+		Options:                beam.PipelineOptions.Export(),
+		Project:                project,
+		Region:                 region,
+		Zone:                   *zone,
+		KmsKey:                 *kmsKey,
+		Network:                *network,
+		Subnetwork:             *subnetwork,
+		NoUsePublicIPs:         *noUsePublicIPs,
+		NumWorkers:             *numWorkers,
+		MaxNumWorkers:          *maxNumWorkers,
+		WorkerHarnessThreads:   *workerHarnessThreads,
+		DiskSizeGb:             *diskSizeGb,
+		DiskType:               *diskType,
+		Algorithm:              *autoscalingAlgorithm,
+		FlexRSGoal:             *flexRSGoal,
+		MachineType:            *machineType,
+		Labels:                 jobLabels,
+		ServiceAccountEmail:    *serviceAccountEmail,
+		TempLocation:           *tempLocation,
+		Worker:                 *jobopts.WorkerBinary,
+		WorkerJar:              *workerJar,
+		WorkerRegion:           *workerRegion,
+		WorkerZone:             *workerZone,
+		TeardownPolicy:         *teardownPolicy,
+		ContainerImage:         getContainerImage(ctx),
 	}
 	if opts.TempLocation == "" {
 		opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index 29118de855e..f536fc917ca 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -17,10 +17,11 @@ package dataflow
 
 import (
 	"context"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
 	"sort"
 	"testing"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
 )
 
 func TestDontUseFlagAsPipelineOption(t *testing.T) {
@@ -39,6 +40,8 @@ func TestGetJobOptions(t *testing.T) {
 	*stagingLocation = "gs://testStagingLocation"
 	*autoscalingAlgorithm = "NONE"
 	*minCPUPlatform = "testPlatform"
+	*flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+	*dataflowServiceOptions = "opt1,opt2"
 
 	*gcpopts.Project = "testProject"
 	*gcpopts.Region = "testRegion"
@@ -64,6 +67,17 @@ func TestGetJobOptions(t *testing.T) {
 			}
 		}
 	}
+	if got, want := len(opts.DataflowServiceOptions), 2; got != want {
+		t.Errorf("len(getJobOptions().DataflowServiceOptions) = %q, want %q", got, want)
+	} else {
+		sort.Strings(opts.DataflowServiceOptions)
+		expectedOptions := []string{"opt1", "opt2"}
+		for i := 0; i < 2; i++ {
+			if got, want := opts.DataflowServiceOptions[i], expectedOptions[i]; got != want {
+				t.Errorf("getJobOptions().DataflowServiceOptions = %q, want %q", got, want)
+			}
+		}
+	}
 	if got, want := opts.Project, "testProject"; got != want {
 		t.Errorf("getJobOptions().Project = %q, want %q", got, want)
 	}
@@ -83,6 +97,9 @@ func TestGetJobOptions(t *testing.T) {
 	if got, want := opts.TempLocation, "gs://testStagingLocation/tmp"; got != want {
 		t.Errorf("getJobOptions().TempLocation = %q, want %q", got, want)
 	}
+	if got, want := opts.FlexRSGoal, "FLEXRS_SPEED_OPTIMIZED"; got != want {
+		t.Errorf("getJobOptions().FlexRSGoal = %q, want %q", got, want)
+	}
 }
 
 func TestGetJobOptions_NoExperimentsSet(t *testing.T) {
@@ -143,6 +160,24 @@ func TestGetJobOptions_InvalidAutoscaling(t *testing.T) {
 	}
 }
 
+func TestGetJobOptions_InvalidRsGoal(t *testing.T) {
+	*labels = `{"label1": "val1", "label2": "val2"}`
+	*stagingLocation = "gs://testStagingLocation"
+	*flexRSGoal = "INVALID"
+	*minCPUPlatform = "testPlatform"
+
+	*gcpopts.Project = "testProject"
+	*gcpopts.Region = "testRegion"
+
+	*jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+	*jobopts.JobName = "testJob"
+
+	_, err := getJobOptions(context.Background())
+	if err == nil {
+		t.Fatalf("getJobOptions() returned error nil, want an error")
+	}
+}
+
 func TestGetJobOptions_DockerWithImage(t *testing.T) {
 	*jobopts.EnvironmentType = "docker"
 	*jobopts.EnvironmentConfig = "testContainerImage"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 459c3f52a4b..fd24729ff9e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -41,12 +41,15 @@ type JobOptions struct {
 	Name string
 	// Experiments are additional experiments.
 	Experiments []string
+	// DataflowServiceOptions are additional job modes and configurations for Dataflow
+	DataflowServiceOptions []string
 	// Pipeline options
 	Options runtime.RawOptions
 
 	Project             string
 	Region              string
 	Zone                string
+	KmsKey              string
 	Network             string
 	Subnetwork          string
 	NoUsePublicIPs      bool
@@ -60,10 +63,13 @@ type JobOptions struct {
 	WorkerZone          string
 	ContainerImage      string
 	ArtifactURLs        []string // Additional packages for workers.
+	FlexRSGoal          string
+	EnableHotKeyLogging bool
 
 	// Autoscaling settings
-	Algorithm     string
-	MaxNumWorkers int64
+	Algorithm            string
+	MaxNumWorkers        int64
+	WorkerHarnessThreads int64
 
 	TempLocation string
 
@@ -155,7 +161,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
 		Name:      opts.Name,
 		Type:      jobType,
 		Environment: &df.Environment{
-			ServiceAccountEmail: opts.ServiceAccountEmail,
+			DebugOptions: &df.DebugOptions{
+				EnableHotKeyLogging: opts.EnableHotKeyLogging,
+			},
+			FlexResourceSchedulingGoal: opts.FlexRSGoal,
+			ServiceAccountEmail:        opts.ServiceAccountEmail,
 			UserAgent: newMsg(userAgent{
 				Name:    core.SdkName,
 				Version: core.SdkVersion,
@@ -174,6 +184,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
 				},
 				GoOptions: opts.Options,
 			}),
+			ServiceOptions:    opts.DataflowServiceOptions,
+			ServiceKmsKeyName: opts.KmsKey,
 			WorkerPools: []*df.WorkerPool{{
 				AutoscalingSettings: &df.AutoscalingSettings{
 					MaxNumWorkers: opts.MaxNumWorkers,
@@ -205,6 +217,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
 	if opts.NumWorkers > 0 {
 		workerPool.NumWorkers = opts.NumWorkers
 	}
+	if opts.WorkerHarnessThreads > 0 {
+		workerPool.NumThreadsPerWorker = opts.WorkerHarnessThreads
+	}
 	if opts.Algorithm != "" {
 		workerPool.AutoscalingSettings.Algorithm = map[string]string{
 			"NONE":             "AUTOSCALING_ALGORITHM_NONE",