You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/26 17:04:21 UTC

[beam] branch master updated: [BEAM-11106] small nits to truncate sdf exec unit (#17755)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ff39fcb5229 [BEAM-11106] small nits to truncate sdf exec unit (#17755)
ff39fcb5229 is described below

commit ff39fcb5229b15140e41a61bd09f7d590730e93a
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Thu May 26 13:04:14 2022 -0400

    [BEAM-11106] small nits to truncate sdf exec unit (#17755)
---
 sdks/go/pkg/beam/core/runtime/exec/sdf.go         | 15 +++++++++++++--
 sdks/go/pkg/beam/core/runtime/graphx/translate.go |  3 ++-
 2 files changed, 15 insertions(+), 3 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/sdf.go b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
index 86040a9f89f..c71b809dea3 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/sdf.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/sdf.go
@@ -308,7 +308,17 @@ func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, d
 //     Timestamps
 //    }
 func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
-	mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+	mainElm := elm.Elm.(*FullValue)
+	inp := mainElm.Elm
+	// For the main element, the way we fill it out depends on whether the input element
+	// is a KV or single-element. Single-elements might have been lifted out of
+	// their FullValue if they were decoded, so we need to have a case for that.
+	// TODO(BEAM-9798): Optimize this so it's decided in exec/translate.go
+	// instead of checking per-element.
+	if e, ok := mainElm.Elm.(*FullValue); ok {
+		mainElm = e
+		inp = e
+	}
 	rest := elm.Elm.(*FullValue).Elm2.(*FullValue).Elm
 	rt := n.ctInv.Invoke(rest)
 	newRest := n.truncateInv.Invoke(rt, mainElm)
@@ -317,10 +327,11 @@ func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *Full
 		return nil
 	}
 	size := n.sizeInv.Invoke(mainElm, newRest)
+
 	output := &FullValue{}
 	output.Timestamp = elm.Timestamp
 	output.Windows = elm.Windows
-	output.Elm = &FullValue{Elm: mainElm, Elm2: &FullValue{Elm: newRest, Elm2: elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
+	output.Elm = &FullValue{Elm: inp, Elm2: &FullValue{Elm: newRest, Elm2: elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
 	output.Elm2 = size
 
 	if err := n.Out.ProcessElement(ctx, output, values...); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 7b777771a8c..c1416dfba5f 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -68,7 +68,7 @@ const (
 
 	URNRequiresSplittableDoFn     = "beam:requirement:pardo:splittable_dofn:v1"
 	URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
-	URNTruncate                   = "beam:transform:sdf_truncate_sized_restrictions_v1"
+	URNTruncate                   = "beam:transform:sdf_truncate_sized_restrictions:v1"
 
 	// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
 	URNArtifactGoWorker = "beam:artifact:type:go_worker_binary:v1"
@@ -87,6 +87,7 @@ func goCapabilities() []string {
 	capabilities := []string{
 		URNLegacyProgressReporting,
 		URNMultiCore,
+		URNTruncate,
 		// TOOD(BEAM-9614): Make this versioned.
 		"beam:version:sdk_base:go",
 	}