You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/06/02 17:00:11 UTC
[beam] branch master updated: [BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c77971053d2 [BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)
c77971053d2 is described below
commit c77971053d2eac2e2bcd1c36c3d312157fc4e450
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu Jun 2 13:00:04 2022 -0400
[BEAM-10976] Fix bug with bundle finalization on SDFs (and a small doc bug) (#17811)
---
sdks/go/examples/snippets/04transforms.go | 3 ++-
sdks/go/pkg/beam/core/runtime/exec/fn.go | 4 ++++
sdks/go/pkg/beam/core/runtime/exec/pardo.go | 4 ++++
sdks/go/pkg/beam/core/runtime/exec/plan.go | 4 ++--
sdks/go/pkg/beam/core/runtime/exec/sdf.go | 8 ++++++++
5 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 80ff7ed66aa..3cb83b643c9 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
@@ -96,7 +97,7 @@ type weDoFn struct{}
// [START bundlefinalization_simplecallback]
-func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalization) {
+func (fn *splittableDoFn) ProcessElement(bf beam.BundleFinalization, rt *sdf.LockRTracker, element string) {
// ... produce output ...
bf.RegisterCallback(5*time.Minute, func() error {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 8724b716d70..48387f78c34 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -52,6 +52,10 @@ type bundleFinalizer struct {
lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer
}
+type needsBundleFinalization interface {
+ AttachFinalizer(*bundleFinalizer)
+}
+
// RegisterCallback is used to register callbacks during DoFn execution.
func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
callback := bundleFinalizationCallback{
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index d8c552df9af..307b9874faf 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -99,6 +99,10 @@ func (n *ParDo) Up(ctx context.Context) error {
return nil
}
+func (n *ParDo) AttachFinalizer(bf *bundleFinalizer) {
+ n.bf = bf
+}
+
// StartBundle does pre-bundle processing operation for the DoFn.
func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error {
if n.status != Up {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index a688ced565a..f1a6f998e5b 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -66,8 +66,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
if p, ok := u.(*PCollection); ok {
pcols = append(pcols, p)
}
- if p, ok := u.(*ParDo); ok {
- p.bf = &bf
+ if p, ok := u.(needsBundleFinalization); ok {
+ p.AttachFinalizer(&bf)
}
}
if len(roots) == 0 {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
index c71b809dea3..a9062d2ae28 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
@@ -437,6 +437,10 @@ func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error {
return n.PDo.Up(ctx)
}
+func (n *ProcessSizedElementsAndRestrictions) AttachFinalizer(bf *bundleFinalizer) {
+ n.PDo.bf = bf
+}
+
// StartBundle calls the ParDo's StartBundle method.
func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)
@@ -950,6 +954,10 @@ func (n *SdfFallback) Up(ctx context.Context) error {
return n.PDo.Up(ctx)
}
+func (n *SdfFallback) AttachFinalizer(bf *bundleFinalizer) {
+ n.PDo.bf = bf
+}
+
// StartBundle calls the ParDo's StartBundle method.
func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)