You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/06/02 17:00:11 UTC

[beam] branch master updated: [BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c77971053d2 [BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)
c77971053d2 is described below

commit c77971053d2eac2e2bcd1c36c3d312157fc4e450
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu Jun 2 13:00:04 2022 -0400

    [BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)
---
 sdks/go/examples/snippets/04transforms.go   | 3 ++-
 sdks/go/pkg/beam/core/runtime/exec/fn.go    | 4 ++++
 sdks/go/pkg/beam/core/runtime/exec/pardo.go | 4 ++++
 sdks/go/pkg/beam/core/runtime/exec/plan.go  | 4 ++--
 sdks/go/pkg/beam/core/runtime/exec/sdf.go   | 8 ++++++++
 5 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 80ff7ed66aa..3cb83b643c9 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
@@ -96,7 +97,7 @@ type weDoFn struct{}
 
 // [START bundlefinalization_simplecallback]
 
-func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalization) {
+func (fn *splittableDoFn) ProcessElement(bf beam.BundleFinalization, rt *sdf.LockRTracker, element string) {
 	// ... produce output ...
 
 	bf.RegisterCallback(5*time.Minute, func() error {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 8724b716d70..48387f78c34 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -52,6 +52,10 @@ type bundleFinalizer struct {
 	lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer
 }
 
+type needsBundleFinalization interface {
+	AttachFinalizer(*bundleFinalizer)
+}
+
 // RegisterCallback is used to register callbacks during DoFn execution.
 func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
 	callback := bundleFinalizationCallback{
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index d8c552df9af..307b9874faf 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -99,6 +99,10 @@ func (n *ParDo) Up(ctx context.Context) error {
 	return nil
 }
 
+func (n *ParDo) AttachFinalizer(bf *bundleFinalizer) {
+	n.bf = bf
+}
+
 // StartBundle does pre-bundle processing operation for the DoFn.
 func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error {
 	if n.status != Up {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index a688ced565a..f1a6f998e5b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -66,8 +66,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
 		if p, ok := u.(*PCollection); ok {
 			pcols = append(pcols, p)
 		}
-		if p, ok := u.(*ParDo); ok {
-			p.bf = &bf
+		if p, ok := u.(needsBundleFinalization); ok {
+			p.AttachFinalizer(&bf)
 		}
 	}
 	if len(roots) == 0 {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
index c71b809dea3..a9062d2ae28 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
@@ -437,6 +437,10 @@ func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error {
 	return n.PDo.Up(ctx)
 }
 
+func (n *ProcessSizedElementsAndRestrictions) AttachFinalizer(bf *bundleFinalizer) {
+	n.PDo.bf = bf
+}
+
 // StartBundle calls the ParDo's StartBundle method.
 func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error {
 	return n.PDo.StartBundle(ctx, id, data)
@@ -950,6 +954,10 @@ func (n *SdfFallback) Up(ctx context.Context) error {
 	return n.PDo.Up(ctx)
 }
 
+func (n *SdfFallback) AttachFinalizer(bf *bundleFinalizer) {
+	n.PDo.bf = bf
+}
+
 // StartBundle calls the ParDo's StartBundle method.
 func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
 	return n.PDo.StartBundle(ctx, id, data)