You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/16 01:28:29 UTC

[GitHub] [beam] youngoli commented on a change in pull request #11728: [BEAM-9951] Creating a synthetic step for the Go SDK.

youngoli commented on a change in pull request #11728:
URL: https://github.com/apache/beam/pull/11728#discussion_r426103916



##########
File path: sdks/go/pkg/beam/io/synthetic/step.go
##########
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package synthetic
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
+	"math/rand"
+	"time"
+)
+
+// Step creates a synthetic step transform that receives KV<[]byte, []byte>
+// elements from other synthetic transforms, and outputs KV<[]byte, []byte>
+// elements based on its inputs.
+//
+// This function accepts a StepConfig to configure the behavior of the synthetic
+// step, including whether that step is implemented as a splittable or
+// non-splittable DoFn.
+//
+// StepConfigs are recommended to be created via the DefaultStepConfig and
+// modified before being passed to this method. Example:
+//
+//    cfg := synthetic.DefaultStepConfig()
+//    cfg.OutputPerInput = 1000
+//    cfg.Splittable = true
+//    cfg.InitialSplits = 2
+//    step := synthetic.Step(s, cfg, input)
+func Step(s beam.Scope, cfg StepConfig, col beam.PCollection) beam.PCollection {
+	s = s.Scope("synthetic.Step")
+	if cfg.Splittable {
+		return beam.ParDo(s, &sdfStepFn{cfg: cfg}, col)
+	} else {
+		return beam.ParDo(s, &stepFn{cfg: cfg}, col)
+	}
+}
+
+// stepFn is a DoFn implementing behavior for synthetic steps. For usage
+// information, see synthetic.Step.
+//
+// The stepFn is expected to be initialized with a cfg and will follow that
+// config to determine its behavior when emitting elements.
+type stepFn struct {
+	cfg StepConfig
+	rng randWrapper
+}
+
+// Setup sets up the random number generator.
+func (fn *stepFn) Setup() {
+	fn.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
+}
+
+// ProcessElement takes an input and either filters it or produces a number of
+// outputs identical to that input based on the outputs per input configuration
+// in StepConfig.
+func (fn *stepFn) ProcessElement(key, val []byte, emit func([]byte, []byte)) {
+	if fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio {
+		return
+	}
+	for i := 0; i < fn.cfg.OutputPerInput; i++ {
+		emit(key, val)
+	}
+}
+
+// sdfStepFn is a splittable DoFn implementing behavior for synthetic steps.
+// For usage information, see synthetic.Step.
+//
+// The sdfStepFn is expected to be initialized with a cfg and will follow
+// that config to determine its behavior when splitting and emitting elements.
+type sdfStepFn struct {
+	cfg StepConfig
+	rng randWrapper
+}
+
+// CreateInitialRestriction creates an offset range restriction representing
+// the number of elements to emit for this received element, as specified by
+// the output per input configuration in StepConfig.
+func (fn *sdfStepFn) CreateInitialRestriction(key, val []byte) offsetrange.Restriction {
+	return offsetrange.Restriction{
+		Start: 0,
+		End:   int64(fn.cfg.OutputPerInput),
+	}
+}
+
+// SplitRestriction splits restrictions equally according to the number of
+// initial splits specified in StepConfig. Each restriction output by this
+// method will contain at least one element, so the number of splits will not
+// exceed the number of elements.
+func (fn *sdfStepFn) SplitRestriction(key, val []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
+	if fn.cfg.InitialSplits <= 1 {
+		// Don't split, just return original restriction.
+		splits = append(splits, rest)
+		return splits

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org