You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/26 17:04:21 UTC
[beam] branch master updated: [BEAM-11106] small nits to truncate sdf exec unit (#17755)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ff39fcb5229 [BEAM-11106] small nits to truncate sdf exec unit (#17755)
ff39fcb5229 is described below
commit ff39fcb5229b15140e41a61bd09f7d590730e93a
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Thu May 26 13:04:14 2022 -0400
[BEAM-11106] small nits to truncate sdf exec unit (#17755)
---
sdks/go/pkg/beam/core/runtime/exec/sdf.go | 15 +++++++++++++--
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 3 ++-
2 files changed, 15 insertions(+), 3 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
index 86040a9f89f..c71b809dea3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
@@ -308,7 +308,17 @@ func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, d
// Timestamps
// }
func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
- mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ mainElm := elm.Elm.(*FullValue)
+ inp := mainElm.Elm
+ // For the main element, the way we fill it out depends on whether the input element
+ // is a KV or single-element. Single-elements might have been lifted out of
+ // their FullValue if they were decoded, so we need to have a case for that.
+ // TODO(BEAM-9798): Optimize this so it's decided in exec/translate.go
+ // instead of checking per-element.
+ if e, ok := mainElm.Elm.(*FullValue); ok {
+ mainElm = e
+ inp = e
+ }
rest := elm.Elm.(*FullValue).Elm2.(*FullValue).Elm
rt := n.ctInv.Invoke(rest)
newRest := n.truncateInv.Invoke(rt, mainElm)
@@ -317,10 +327,11 @@ func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *Full
return nil
}
size := n.sizeInv.Invoke(mainElm, newRest)
+
output := &FullValue{}
output.Timestamp = elm.Timestamp
output.Windows = elm.Windows
- output.Elm = &FullValue{Elm: mainElm, Elm2: &FullValue{Elm: newRest, Elm2: elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
+ output.Elm = &FullValue{Elm: inp, Elm2: &FullValue{Elm: newRest, Elm2: elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
output.Elm2 = size
if err := n.Out.ProcessElement(ctx, output, values...); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 7b777771a8c..c1416dfba5f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -68,7 +68,7 @@ const (
URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"
URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
- URNTruncate = "beam:transform:sdf_truncate_sized_restrictions_v1"
+ URNTruncate = "beam:transform:sdf_truncate_sized_restrictions:v1"
// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
@@ -87,6 +87,7 @@ func goCapabilities() []string {
capabilities := []string{
URNLegacyProgressReporting,
URNMultiCore,
+ URNTruncate,
// TOOD(BEAM-9614): Make this versioned.
"beam:version:sdk_base:go",
}