You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/18 22:08:34 UTC

[GitHub] [beam] youngoli opened a new pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

youngoli opened a new pull request #11747:
URL: https://github.com/apache/beam/pull/11747


   Instead of just creating SourceConfigs and StepConfigs, have a builder
   pattern to allow more user-friendly creation of those configs with
   defaults.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli merged pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

Posted by GitBox <gi...@apache.org>.
youngoli merged pull request #11747:
URL: https://github.com/apache/beam/pull/11747


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lostluck commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

Posted by GitBox <gi...@apache.org>.
lostluck commented on a change in pull request #11747:
URL: https://github.com/apache/beam/pull/11747#discussion_r426935903



##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -135,27 +155,79 @@ func (fn *sourceFn) ProcessElement(rt *offsetrange.Tracker, config SourceConfig,
 	return nil
 }
 
-// DefaultSourceConfig creates a SourceConfig with intended defaults for its
-// fields. SourceConfigs should be initialized with this method.
-func DefaultSourceConfig() SourceConfig {
-	return SourceConfig{
-		NumElements:   1, // Defaults shouldn't drop elements, so at least 1.
-		InitialSplits: 1, // Defaults to 1, i.e. no initial splitting.
+// SourceConfigBuilder is used to initialize SourceConfigs. See
+// SourceConfigBuilder's methods for descriptions of the fields in a
+// SourceConfig and how they can be set. The intended approach for using this
+// builder is to begin by calling the DefaultSourceConfig function, followed by
+// calling setters, followed by calling Build.
+//
+// Usage example:
+//
+//    cfg := synthetic.DefaultSourceConfig().NumElements(5000).InitialSplits(2).Build()
+type SourceConfigBuilder struct {
+	cfg SourceConfig
+}
+
+// DefaultSourceConfig creates a SourceConfigBuilder set with intended defaults
+// for the SourceConfig fields. This function is the intended starting point for
+// initializing a SourceConfig and should always be used to create
+// SourceConfigBuilders.
+//
+// To see descriptions of the various SourceConfig fields and their defaults,
+// see the methods to SourceConfigBuilder.
+func DefaultSourceConfig() *SourceConfigBuilder {
+	return &SourceConfigBuilder{
+		cfg: SourceConfig{
+			numElements:   1, // 0 is invalid (drops elements).
+			initialSplits: 1, // 0 is invalid (drops elements).
+		},
+	}
+}
+
+// NumElements is the number of elements for the source to generate and emit.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they result in sources that emit no elements.
+func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
+	b.cfg.numElements = val
+	return b
+}
+
+// InitialSplits determines the number of initial splits to perform in the
+// source's SplitRestriction method. Restrictions in synthetic sources represent
+// the number of elements being emitted, and this split is performed evenly
+// across that number of elements.
+//
+// Each resulting restriction will have at least 1 element in it, and each
+// element being emitted will be contained in exactly one restriction. That
+// means that if the desired number of splits is greater than the number of
+// elements N, then N initial restrictions will be created, each containing 1
+// element.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they would result in dropping elements that
+// are expected to be emitted.
+func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
+	b.cfg.initialSplits = val
+	return b
+}
+
+// Build constructs the SourceConfig initialized by this builder. It also
+// performs error checking on the fields, and panics if any have been set to
+// invalid values.
+func (b *SourceConfigBuilder) Build() SourceConfig {
+	if b.cfg.initialSplits <= 0 {
+		panic(fmt.Sprintf("SourceConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits))
+	}
+	if b.cfg.numElements <= 0 {
+		panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got: %v", b.cfg.numElements))
 	}
+	return b.cfg
 }
 
 // SourceConfig is a struct containing all the configuration options for a
-// synthetic source.
+// synthetic source. It should be created via a SourceConfigBuilder.
 type SourceConfig struct {
-	// NumElements is the number of elements for the source to generate and
-	// emit.
-	NumElements int
-
-	// InitialSplits determines the number of initial splits to perform in the
-	// source's SplitRestriction method. Note that in some edge cases, the
-	// number of splits performed might differ from this config value. Each
-	// restriction will always have one element in it, and at least one
-	// restriction will always be output, so the number of splits will be in
-	// the range of [1, N] where N is the size of the original restriction.
-	InitialSplits int
+	numElements   int

Review comment:
       Note: Default encoding with JSON (or even future with beam.Schemas) will not encode unexported fields. You'll need to register a coder with beam.RegisterCoder if you want to ensure these get encoded properly.
   
   Alternatively, having the fields be exported while still providing and recommending a builder is not unreasonable. Eg. Document on the type  (as you have it) that the builder is recommended etc.

##########
File path: sdks/go/pkg/beam/io/synthetic/step.go
##########
@@ -144,49 +143,130 @@ func (fn *sdfStepFn) Setup() {
 // ProcessElement takes an input and either filters it or produces a number of
 // outputs identical to that input based on the restriction size.
 func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) {
-	if fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio {
-		return
-	}
+	filtered := fn.cfg.filterRatio > 0 && fn.rng.Float64() < fn.cfg.filterRatio
+
 	for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
-		emit(key, val)
+		if !filtered {
+			emit(key, val)
+		}
+	}
+}
+
+// StepConfigBuilder is used to initialize StepConfigs. See StepConfigBuilder's
+// methods for descriptions of the fields in a StepConfig and how they can be
+// set. The intended approach for using this builder is to begin by calling the
+// DefaultStepConfig function, followed by calling setters, followed by calling
+// Build.
+//
+// Usage example:
+//
+//    cfg := synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build()
+type StepConfigBuilder struct {
+	cfg StepConfig
+}
+
+// DefaultSourceConfig creates a StepConfig with intended defaults for the
+// StepConfig fields. This function is the intended starting point for
+// initializing a StepConfig and should always be used to create
+// StepConfigBuilders.
+//
+// To see descriptions of the various StepConfig fields and their defaults, see
+// the methods to StepConfigBuilder.
+func DefaultStepConfig() *StepConfigBuilder {
+	return &StepConfigBuilder{
+		cfg: StepConfig{
+			outputPerInput: 1,     // Defaults shouldn't drop elements, so at least 1.
+			filterRatio:    0.0,   // Defaults shouldn't drop elements, so don't filter.
+			splittable:     false, // Default to non-splittable, SDFs are situational.
+			initialSplits:  1,     // Defaults to 1, i.e. no initial splitting.
+		},
 	}
 }
 
-// DefaultSourceConfig creates a SourceConfig with intended defaults for its
-// fields. SourceConfigs should be initialized with this method.
-func DefaultStepConfig() StepConfig {
-	return StepConfig{
-		OutputPerInput: 1,     // Defaults shouldn't drop elements, so at least 1.
-		FilterRatio:    0.0,   // Defaults shouldn't drop elements, so don't filter.
-		Splittable:     false, // Default to non-splittable, SDFs are situational.
-		InitialSplits:  1,     // Defaults to 1, i.e. no initial splitting.
+// OutputPerInput is the number of outputs to emit per input received. Each
+// output is identical to the original input. A value of 0 drops all inputs and
+// produces no output.
+//
+// Valid values are in the range of [0, ...] and the default value is 1. Values
+// below 0 are invalid as they have no logical meaning for this field.
+func (b *StepConfigBuilder) OutputPerInput(val int) *StepConfigBuilder {
+	b.cfg.outputPerInput = val
+	return b
+}
+
+// FilterRatio indicates the random chance that an input will be filtered
+// out, meaning that no outputs will get emitted for it. For example, a
+// FilterRatio of 0.25 means that 25% of inputs will be filtered out, a
+// FilterRatio of 0 means no elements are filtered, and a FilterRatio of 1.0
+// means every element is filtered.
+//
+// In a non-splittable step, this is performed on each input element, meaning
+// all outputs for that element would be filtered. In a splittable step, this is
+// performed on each input restriction instead of the entire element, meaning
+// that some outputs for an element may be filtered and others kept.
+//
+// Note that even when elements are filtered out, the work associated with
+// processing those elements is still performed, which differs from setting an
+// OutputPerInput of 0. Also note that if a
+//
+// Valid values are in the range if [0.0, 1.0], and the default value is 0. In
+// order to avoid precision errors, invalid values do not cause errors. Instead,
+// values below 0 are functionally equivalent to 0, and values above 1 are
+// functionally equivalent to 1.
+func (b *StepConfigBuilder) FilterRatio(val float64) *StepConfigBuilder {
+	b.cfg.filterRatio = val
+	return b
+}
+
+// Splittable indicates whether the step should use the splittable DoFn or
+// non-splittable DoFn implementation.
+//
+// Splittable steps will split along restrictions representing the number of
+// OutputPerInput for each element, so it is most useful for steps with a high
+// OutputPerInput. Conversely, if OutputPerInput is 1, then there is no way to
+// split restrictions further, so making the step splittable will do nothing.
+func (b *StepConfigBuilder) Splittable(val bool) *StepConfigBuilder {
+	b.cfg.splittable = val
+	return b
+}
+
+// InitialSplits is only applicable if Splittable is set to true, and determines
+// the number of initial splits to perform in the step's SplitRestriction
+// method. Restrictions in synthetic steps represent the number of elements to
+// emit for each input element, as defined by the OutputPerInput config field,
+// and this split is performed evenly across that number of elements.
+//
+// Each resulting restriction will have at least 1 element in it, and each
+// element being emitted will be contained in exactly one restriction. That
+// means that if the desired number of splits is greater than the OutputPerInput
+// N, then N initial restrictions will be created, each containing 1 element.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they would result in dropping elements that
+// are expected to be emitted.
+func (b *StepConfigBuilder) InitialSplits(val int) *StepConfigBuilder {
+	b.cfg.initialSplits = val
+	return b
+}
+
+// Build constructs the StepConfig initialized by this builder. It also performs
+// error checking on the fields, and panics if any have been set to invalid
+// values.
+func (b *StepConfigBuilder) Build() StepConfig {
+	if b.cfg.initialSplits <= 0 {
+		panic(fmt.Sprintf("StepConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits))
+	}
+	if b.cfg.outputPerInput < 0 {
+		panic(fmt.Sprintf("StepConfig.OutputPerInput cannot be negative. Got: %v", b.cfg.outputPerInput))
 	}
+	return b.cfg
 }
 
 // StepConfig is a struct containing all the configuration options for a
-// synthetic step.
+// synthetic step. It should be created via a StepConfigBuilder.
 type StepConfig struct {
-	// OutputPerInput is the number of outputs to emit per input received. Each
-	// output is identical to the original input. A value of 0 drops each input.
-	OutputPerInput int
-
-	// FilterRatio indicates the random chance that an input will be filtered
-	// out, meaning that no outputs will get emitted for it. For example, a
-	// FilterRatio of 0.25 means that 25% of inputs will get filtered out.
-	FilterRatio float64
-
-	// Splittable indicates whether the step should use the splittable DoFn or
-	// non-splittable DoFn implementation. Splittable steps will split the
-	// number of OutputPerInput into restrictions, so it is most useful for
-	// steps with a high OutputPerInput.
-	Splittable bool
-
-	// InitialSplits is only applicable if Splittable is set to true, and
-	// determines the number of initial splits to perform in the step's
-	// SplitRestriction method. Note that in some edge cases, the number of
-	// splits performed might differ from this config value. Each restriction
-	// will always have one element in it, and at least one restriction will
-	// always be output, so the number of splits will be in the range of [1, N]
-	// where N is the size of the original restriction.
-	InitialSplits int
+	outputPerInput int

Review comment:
       Same comment here WRT the unexported fields being unserializable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

Posted by GitBox <gi...@apache.org>.
youngoli commented on pull request #11747:
URL: https://github.com/apache/beam/pull/11747#issuecomment-630460354


   R: @lostluck 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11747:
URL: https://github.com/apache/beam/pull/11747#discussion_r427004634



##########
File path: sdks/go/pkg/beam/io/synthetic/step.go
##########
@@ -144,49 +143,130 @@ func (fn *sdfStepFn) Setup() {
 // ProcessElement takes an input and either filters it or produces a number of
 // outputs identical to that input based on the restriction size.
 func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, emit func([]byte, []byte)) {
-	if fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio {
-		return
-	}
+	filtered := fn.cfg.filterRatio > 0 && fn.rng.Float64() < fn.cfg.filterRatio
+
 	for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
-		emit(key, val)
+		if !filtered {
+			emit(key, val)
+		}
+	}
+}
+
+// StepConfigBuilder is used to initialize StepConfigs. See StepConfigBuilder's
+// methods for descriptions of the fields in a StepConfig and how they can be
+// set. The intended approach for using this builder is to begin by calling the
+// DefaultStepConfig function, followed by calling setters, followed by calling
+// Build.
+//
+// Usage example:
+//
+//    cfg := synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build()
+type StepConfigBuilder struct {
+	cfg StepConfig
+}
+
+// DefaultSourceConfig creates a StepConfig with intended defaults for the
+// StepConfig fields. This function is the intended starting point for
+// initializing a StepConfig and should always be used to create
+// StepConfigBuilders.
+//
+// To see descriptions of the various StepConfig fields and their defaults, see
+// the methods to StepConfigBuilder.
+func DefaultStepConfig() *StepConfigBuilder {
+	return &StepConfigBuilder{
+		cfg: StepConfig{
+			outputPerInput: 1,     // Defaults shouldn't drop elements, so at least 1.
+			filterRatio:    0.0,   // Defaults shouldn't drop elements, so don't filter.
+			splittable:     false, // Default to non-splittable, SDFs are situational.
+			initialSplits:  1,     // Defaults to 1, i.e. no initial splitting.
+		},
 	}
 }
 
-// DefaultSourceConfig creates a SourceConfig with intended defaults for its
-// fields. SourceConfigs should be initialized with this method.
-func DefaultStepConfig() StepConfig {
-	return StepConfig{
-		OutputPerInput: 1,     // Defaults shouldn't drop elements, so at least 1.
-		FilterRatio:    0.0,   // Defaults shouldn't drop elements, so don't filter.
-		Splittable:     false, // Default to non-splittable, SDFs are situational.
-		InitialSplits:  1,     // Defaults to 1, i.e. no initial splitting.
+// OutputPerInput is the number of outputs to emit per input received. Each
+// output is identical to the original input. A value of 0 drops all inputs and
+// produces no output.
+//
+// Valid values are in the range of [0, ...] and the default value is 1. Values
+// below 0 are invalid as they have no logical meaning for this field.
+func (b *StepConfigBuilder) OutputPerInput(val int) *StepConfigBuilder {
+	b.cfg.outputPerInput = val
+	return b
+}
+
+// FilterRatio indicates the random chance that an input will be filtered
+// out, meaning that no outputs will get emitted for it. For example, a
+// FilterRatio of 0.25 means that 25% of inputs will be filtered out, a
+// FilterRatio of 0 means no elements are filtered, and a FilterRatio of 1.0
+// means every element is filtered.
+//
+// In a non-splittable step, this is performed on each input element, meaning
+// all outputs for that element would be filtered. In a splittable step, this is
+// performed on each input restriction instead of the entire element, meaning
+// that some outputs for an element may be filtered and others kept.
+//
+// Note that even when elements are filtered out, the work associated with
+// processing those elements is still performed, which differs from setting an
+// OutputPerInput of 0. Also note that if a
+//
+// Valid values are in the range if [0.0, 1.0], and the default value is 0. In
+// order to avoid precision errors, invalid values do not cause errors. Instead,
+// values below 0 are functionally equivalent to 0, and values above 1 are
+// functionally equivalent to 1.
+func (b *StepConfigBuilder) FilterRatio(val float64) *StepConfigBuilder {
+	b.cfg.filterRatio = val
+	return b
+}
+
+// Splittable indicates whether the step should use the splittable DoFn or
+// non-splittable DoFn implementation.
+//
+// Splittable steps will split along restrictions representing the number of
+// OutputPerInput for each element, so it is most useful for steps with a high
+// OutputPerInput. Conversely, if OutputPerInput is 1, then there is no way to
+// split restrictions further, so making the step splittable will do nothing.
+func (b *StepConfigBuilder) Splittable(val bool) *StepConfigBuilder {
+	b.cfg.splittable = val
+	return b
+}
+
+// InitialSplits is only applicable if Splittable is set to true, and determines
+// the number of initial splits to perform in the step's SplitRestriction
+// method. Restrictions in synthetic steps represent the number of elements to
+// emit for each input element, as defined by the OutputPerInput config field,
+// and this split is performed evenly across that number of elements.
+//
+// Each resulting restriction will have at least 1 element in it, and each
+// element being emitted will be contained in exactly one restriction. That
+// means that if the desired number of splits is greater than the OutputPerInput
+// N, then N initial restrictions will be created, each containing 1 element.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they would result in dropping elements that
+// are expected to be emitted.
+func (b *StepConfigBuilder) InitialSplits(val int) *StepConfigBuilder {
+	b.cfg.initialSplits = val
+	return b
+}
+
+// Build constructs the StepConfig initialized by this builder. It also performs
+// error checking on the fields, and panics if any have been set to invalid
+// values.
+func (b *StepConfigBuilder) Build() StepConfig {
+	if b.cfg.initialSplits <= 0 {
+		panic(fmt.Sprintf("StepConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits))
+	}
+	if b.cfg.outputPerInput < 0 {
+		panic(fmt.Sprintf("StepConfig.OutputPerInput cannot be negative. Got: %v", b.cfg.outputPerInput))
 	}
+	return b.cfg
 }
 
 // StepConfig is a struct containing all the configuration options for a
-// synthetic step.
+// synthetic step. It should be created via a StepConfigBuilder.
 type StepConfig struct {
-	// OutputPerInput is the number of outputs to emit per input received. Each
-	// output is identical to the original input. A value of 0 drops each input.
-	OutputPerInput int
-
-	// FilterRatio indicates the random chance that an input will be filtered
-	// out, meaning that no outputs will get emitted for it. For example, a
-	// FilterRatio of 0.25 means that 25% of inputs will get filtered out.
-	FilterRatio float64
-
-	// Splittable indicates whether the step should use the splittable DoFn or
-	// non-splittable DoFn implementation. Splittable steps will split the
-	// number of OutputPerInput into restrictions, so it is most useful for
-	// steps with a high OutputPerInput.
-	Splittable bool
-
-	// InitialSplits is only applicable if Splittable is set to true, and
-	// determines the number of initial splits to perform in the step's
-	// SplitRestriction method. Note that in some edge cases, the number of
-	// splits performed might differ from this config value. Each restriction
-	// will always have one element in it, and at least one restriction will
-	// always be output, so the number of splits will be in the range of [1, N]
-	// where N is the size of the original restriction.
-	InitialSplits int
+	outputPerInput int

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] youngoli commented on a change in pull request #11747: [BEAM-9951] Using the builder pattern for Go synthetic config frontend

Posted by GitBox <gi...@apache.org>.
youngoli commented on a change in pull request #11747:
URL: https://github.com/apache/beam/pull/11747#discussion_r427004608



##########
File path: sdks/go/pkg/beam/io/synthetic/source.go
##########
@@ -135,27 +155,79 @@ func (fn *sourceFn) ProcessElement(rt *offsetrange.Tracker, config SourceConfig,
 	return nil
 }
 
-// DefaultSourceConfig creates a SourceConfig with intended defaults for its
-// fields. SourceConfigs should be initialized with this method.
-func DefaultSourceConfig() SourceConfig {
-	return SourceConfig{
-		NumElements:   1, // Defaults shouldn't drop elements, so at least 1.
-		InitialSplits: 1, // Defaults to 1, i.e. no initial splitting.
+// SourceConfigBuilder is used to initialize SourceConfigs. See
+// SourceConfigBuilder's methods for descriptions of the fields in a
+// SourceConfig and how they can be set. The intended approach for using this
+// builder is to begin by calling the DefaultSourceConfig function, followed by
+// calling setters, followed by calling Build.
+//
+// Usage example:
+//
+//    cfg := synthetic.DefaultSourceConfig().NumElements(5000).InitialSplits(2).Build()
+type SourceConfigBuilder struct {
+	cfg SourceConfig
+}
+
+// DefaultSourceConfig creates a SourceConfigBuilder set with intended defaults
+// for the SourceConfig fields. This function is the intended starting point for
+// initializing a SourceConfig and should always be used to create
+// SourceConfigBuilders.
+//
+// To see descriptions of the various SourceConfig fields and their defaults,
+// see the methods to SourceConfigBuilder.
+func DefaultSourceConfig() *SourceConfigBuilder {
+	return &SourceConfigBuilder{
+		cfg: SourceConfig{
+			numElements:   1, // 0 is invalid (drops elements).
+			initialSplits: 1, // 0 is invalid (drops elements).
+		},
+	}
+}
+
+// NumElements is the number of elements for the source to generate and emit.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they result in sources that emit no elements.
+func (b *SourceConfigBuilder) NumElements(val int) *SourceConfigBuilder {
+	b.cfg.numElements = val
+	return b
+}
+
+// InitialSplits determines the number of initial splits to perform in the
+// source's SplitRestriction method. Restrictions in synthetic sources represent
+// the number of elements being emitted, and this split is performed evenly
+// across that number of elements.
+//
+// Each resulting restriction will have at least 1 element in it, and each
+// element being emitted will be contained in exactly one restriction. That
+// means that if the desired number of splits is greater than the number of
+// elements N, then N initial restrictions will be created, each containing 1
+// element.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they would result in dropping elements that
+// are expected to be emitted.
+func (b *SourceConfigBuilder) InitialSplits(val int) *SourceConfigBuilder {
+	b.cfg.initialSplits = val
+	return b
+}
+
+// Build constructs the SourceConfig initialized by this builder. It also
+// performs error checking on the fields, and panics if any have been set to
+// invalid values.
+func (b *SourceConfigBuilder) Build() SourceConfig {
+	if b.cfg.initialSplits <= 0 {
+		panic(fmt.Sprintf("SourceConfig.InitialSplits must be >= 1. Got: %v", b.cfg.initialSplits))
+	}
+	if b.cfg.numElements <= 0 {
+		panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got: %v", b.cfg.numElements))
 	}
+	return b.cfg
 }
 
 // SourceConfig is a struct containing all the configuration options for a
-// synthetic source.
+// synthetic source. It should be created via a SourceConfigBuilder.
 type SourceConfig struct {
-	// NumElements is the number of elements for the source to generate and
-	// emit.
-	NumElements int
-
-	// InitialSplits determines the number of initial splits to perform in the
-	// source's SplitRestriction method. Note that in some edge cases, the
-	// number of splits performed might differ from this config value. Each
-	// restriction will always have one element in it, and at least one
-	// restriction will always be output, so the number of splits will be in
-	// the range of [1, N] where N is the size of the original restriction.
-	InitialSplits int
+	numElements   int

Review comment:
       Oh yea, forgot about that. I'll go with having them exported and just recommend a builder. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org