You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/25 19:23:21 UTC
[beam] branch master updated: [BEAM-14509] Add several flags to dataflow runner (#17752)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2039b39f96f [BEAM-14509] Add several flags to dataflow runner (#17752)
2039b39f96f is described below
commit 2039b39f96ff96e0899a830e560c02c6c8e17acb
Author: Danny McCormick <da...@google.com>
AuthorDate: Wed May 25 15:23:08 2022 -0400
[BEAM-14509] Add several flags to dataflow runner (#17752)
---
sdks/go/pkg/beam/runners/dataflow/dataflow.go | 112 +++++++++++++--------
sdks/go/pkg/beam/runners/dataflow/dataflow_test.go | 39 ++++++-
.../pkg/beam/runners/dataflow/dataflowlib/job.go | 21 +++-
3 files changed, 123 insertions(+), 49 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 33b4b0fc66d..eff46bca3f3 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -51,26 +51,32 @@ import (
// TODO(herohde) 5/16/2017: the Dataflow flags should match the other SDKs.
var (
- endpoint = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).")
- stagingLocation = flag.String("staging_location", "", "GCS staging location (required).")
- image = flag.String("worker_harness_container_image", "", "Worker harness container image (required).")
- labels = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).")
- serviceAccountEmail = flag.String("service_account_email", "", "Service account email (optional).")
- numWorkers = flag.Int64("num_workers", 0, "Number of workers (optional).")
- maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
- diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).")
- diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).")
- autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
- zone = flag.String("zone", "", "GCP zone (optional)")
- network = flag.String("network", "", "GCP network (optional)")
- subnetwork = flag.String("subnetwork", "", "GCP subnetwork (optional)")
- noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)")
- tempLocation = flag.String("temp_location", "", "Temp location (optional)")
- machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)")
- minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
- workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
- workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)")
- workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
+ endpoint = flag.String("dataflow_endpoint", "", "Dataflow endpoint (optional).")
+ stagingLocation = flag.String("staging_location", "", "GCS staging location (required).")
+ image = flag.String("worker_harness_container_image", "", "Worker harness container image (optional).")
+ labels = flag.String("labels", "", "JSON-formatted map[string]string of job labels (optional).")
+ serviceAccountEmail = flag.String("service_account_email", "", "Service account email (optional).")
+ numWorkers = flag.Int64("num_workers", 0, "Number of workers (optional).")
+ workerHarnessThreads = flag.Int64("number_of_worker_harness_threads", 0, "The number of threads per each worker harness process (optional).")
+ maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
+ diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).")
+ diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).")
+ autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
+ zone = flag.String("zone", "", "GCP zone (optional)")
+ kmsKey = flag.String("dataflow_kms_key", "", "The Cloud KMS key identifier used to encrypt data at rest (optional).")
+ network = flag.String("network", "", "GCP network (optional)")
+ subnetwork = flag.String("subnetwork", "", "GCP subnetwork (optional)")
+ noUsePublicIPs = flag.Bool("no_use_public_ips", false, "Workers must not use public IP addresses (optional)")
+ tempLocation = flag.String("temp_location", "", "Temp location (optional)")
+ machineType = flag.String("worker_machine_type", "", "GCE machine type (optional)")
+ minCPUPlatform = flag.String("min_cpu_platform", "", "GCE minimum cpu platform (optional)")
+ workerJar = flag.String("dataflow_worker_jar", "", "Dataflow worker jar (optional)")
+ workerRegion = flag.String("worker_region", "", "Dataflow worker region (optional)")
+ workerZone = flag.String("worker_zone", "", "Dataflow worker zone (optional)")
+ dataflowServiceOptions = flag.String("dataflow_service_options", "", "Comma separated list of additional job modes and configurations (optional)")
+ flexRSGoal = flag.String("flexrs_goal", "", "Which Flexible Resource Scheduling mode to run in (optional)")
+ // TODO(BEAM-14512) Turn this on once TO_STRING is implemented
+ // enableHotKeyLogging = flag.Bool("enable_hot_key_logging", false, "Specifies that when a hot key is detected in the pipeline, the literal, human-readable key is printed in the user's Cloud Logging project (optional).")
dryRun = flag.Bool("dry_run", false, "Dry run. Just print the job, but don't submit it.")
teardownPolicy = flag.String("teardown_policy", "", "Job teardown policy (internal only).")
@@ -242,6 +248,15 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
}
}
+ if *flexRSGoal != "" {
+ switch *flexRSGoal {
+ case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", "FLEXRS_COST_OPTIMIZED":
+ // valid values
+ default:
+ return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
+ }
+ }
+
hooks.SerializeHooksToOptions()
experiments := jobopts.GetExperiments()
@@ -267,32 +282,41 @@ func getJobOptions(ctx context.Context) (*dataflowlib.JobOptions, error) {
experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
}
+ var dfServiceOptions []string
+ if *dataflowServiceOptions != "" {
+ dfServiceOptions = strings.Split(*dataflowServiceOptions, ",")
+ }
+
beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts := &dataflowlib.JobOptions{
- Name: jobopts.GetJobName(),
- Experiments: experiments,
- Options: beam.PipelineOptions.Export(),
- Project: project,
- Region: region,
- Zone: *zone,
- Network: *network,
- Subnetwork: *subnetwork,
- NoUsePublicIPs: *noUsePublicIPs,
- NumWorkers: *numWorkers,
- MaxNumWorkers: *maxNumWorkers,
- DiskSizeGb: *diskSizeGb,
- DiskType: *diskType,
- Algorithm: *autoscalingAlgorithm,
- MachineType: *machineType,
- Labels: jobLabels,
- ServiceAccountEmail: *serviceAccountEmail,
- TempLocation: *tempLocation,
- Worker: *jobopts.WorkerBinary,
- WorkerJar: *workerJar,
- WorkerRegion: *workerRegion,
- WorkerZone: *workerZone,
- TeardownPolicy: *teardownPolicy,
- ContainerImage: getContainerImage(ctx),
+ Name: jobopts.GetJobName(),
+ Experiments: experiments,
+ DataflowServiceOptions: dfServiceOptions,
+ Options: beam.PipelineOptions.Export(),
+ Project: project,
+ Region: region,
+ Zone: *zone,
+ KmsKey: *kmsKey,
+ Network: *network,
+ Subnetwork: *subnetwork,
+ NoUsePublicIPs: *noUsePublicIPs,
+ NumWorkers: *numWorkers,
+ MaxNumWorkers: *maxNumWorkers,
+ WorkerHarnessThreads: *workerHarnessThreads,
+ DiskSizeGb: *diskSizeGb,
+ DiskType: *diskType,
+ Algorithm: *autoscalingAlgorithm,
+ FlexRSGoal: *flexRSGoal,
+ MachineType: *machineType,
+ Labels: jobLabels,
+ ServiceAccountEmail: *serviceAccountEmail,
+ TempLocation: *tempLocation,
+ Worker: *jobopts.WorkerBinary,
+ WorkerJar: *workerJar,
+ WorkerRegion: *workerRegion,
+ WorkerZone: *workerZone,
+ TeardownPolicy: *teardownPolicy,
+ ContainerImage: getContainerImage(ctx),
}
if opts.TempLocation == "" {
opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
index 29118de855e..f536fc917ca 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
@@ -17,10 +17,11 @@ package dataflow
import (
"context"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
- "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
"sort"
"testing"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
)
func TestDontUseFlagAsPipelineOption(t *testing.T) {
@@ -39,6 +40,8 @@ func TestGetJobOptions(t *testing.T) {
*stagingLocation = "gs://testStagingLocation"
*autoscalingAlgorithm = "NONE"
*minCPUPlatform = "testPlatform"
+ *flexRSGoal = "FLEXRS_SPEED_OPTIMIZED"
+ *dataflowServiceOptions = "opt1,opt2"
*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"
@@ -64,6 +67,17 @@ func TestGetJobOptions(t *testing.T) {
}
}
}
+ if got, want := len(opts.DataflowServiceOptions), 2; got != want {
+ t.Errorf("len(getJobOptions().DataflowServiceOptions) = %q, want %q", got, want)
+ } else {
+ sort.Strings(opts.DataflowServiceOptions)
+ expectedOptions := []string{"opt1", "opt2"}
+ for i := 0; i < 2; i++ {
+ if got, want := opts.DataflowServiceOptions[i], expectedOptions[i]; got != want {
+ t.Errorf("getJobOptions().DataflowServiceOptions = %q, want %q", got, want)
+ }
+ }
+ }
if got, want := opts.Project, "testProject"; got != want {
t.Errorf("getJobOptions().Project = %q, want %q", got, want)
}
@@ -83,6 +97,9 @@ func TestGetJobOptions(t *testing.T) {
if got, want := opts.TempLocation, "gs://testStagingLocation/tmp"; got != want {
t.Errorf("getJobOptions().TempLocation = %q, want %q", got, want)
}
+ if got, want := opts.FlexRSGoal, "FLEXRS_SPEED_OPTIMIZED"; got != want {
+ t.Errorf("getJobOptions().FlexRSGoal = %q, want %q", got, want)
+ }
}
func TestGetJobOptions_NoExperimentsSet(t *testing.T) {
@@ -143,6 +160,24 @@ func TestGetJobOptions_InvalidAutoscaling(t *testing.T) {
}
}
+func TestGetJobOptions_InvalidRsGoal(t *testing.T) {
+ *labels = `{"label1": "val1", "label2": "val2"}`
+ *stagingLocation = "gs://testStagingLocation"
+ *flexRSGoal = "INVALID"
+ *minCPUPlatform = "testPlatform"
+
+ *gcpopts.Project = "testProject"
+ *gcpopts.Region = "testRegion"
+
+ *jobopts.Experiments = "use_runner_v2,use_portable_job_submission"
+ *jobopts.JobName = "testJob"
+
+ _, err := getJobOptions(context.Background())
+ if err == nil {
+ t.Fatalf("getJobOptions() returned error nil, want an error")
+ }
+}
+
func TestGetJobOptions_DockerWithImage(t *testing.T) {
*jobopts.EnvironmentType = "docker"
*jobopts.EnvironmentConfig = "testContainerImage"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 459c3f52a4b..fd24729ff9e 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -41,12 +41,15 @@ type JobOptions struct {
Name string
// Experiments are additional experiments.
Experiments []string
+ // DataflowServiceOptions are additional job modes and configurations for Dataflow
+ DataflowServiceOptions []string
// Pipeline options
Options runtime.RawOptions
Project string
Region string
Zone string
+ KmsKey string
Network string
Subnetwork string
NoUsePublicIPs bool
@@ -60,10 +63,13 @@ type JobOptions struct {
WorkerZone string
ContainerImage string
ArtifactURLs []string // Additional packages for workers.
+ FlexRSGoal string
+ EnableHotKeyLogging bool
// Autoscaling settings
- Algorithm string
- MaxNumWorkers int64
+ Algorithm string
+ MaxNumWorkers int64
+ WorkerHarnessThreads int64
TempLocation string
@@ -155,7 +161,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
Name: opts.Name,
Type: jobType,
Environment: &df.Environment{
- ServiceAccountEmail: opts.ServiceAccountEmail,
+ DebugOptions: &df.DebugOptions{
+ EnableHotKeyLogging: opts.EnableHotKeyLogging,
+ },
+ FlexResourceSchedulingGoal: opts.FlexRSGoal,
+ ServiceAccountEmail: opts.ServiceAccountEmail,
UserAgent: newMsg(userAgent{
Name: core.SdkName,
Version: core.SdkVersion,
@@ -174,6 +184,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
},
GoOptions: opts.Options,
}),
+ ServiceOptions: opts.DataflowServiceOptions,
+ ServiceKmsKeyName: opts.KmsKey,
WorkerPools: []*df.WorkerPool{{
AutoscalingSettings: &df.AutoscalingSettings{
MaxNumWorkers: opts.MaxNumWorkers,
@@ -205,6 +217,9 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
if opts.NumWorkers > 0 {
workerPool.NumWorkers = opts.NumWorkers
}
+ if opts.WorkerHarnessThreads > 0 {
+ workerPool.NumThreadsPerWorker = opts.WorkerHarnessThreads
+ }
if opts.Algorithm != "" {
workerPool.AutoscalingSettings.Algorithm = map[string]string{
"NONE": "AUTOSCALING_ALGORITHM_NONE",