You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/04/02 01:03:02 UTC

[beam] branch lostluck-patch-1 created (now b2b4f66)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch lostluck-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at b2b4f66  [BEAM-9667] Allow metrics in DoFn Setup

This branch includes the following new commits:

     new b2b4f66  [BEAM-9667] Allow metrics in DoFn Setup

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-9667] Allow metrics in DoFn Setup

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-patch-1
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b2b4f66e1c910fff579f9469c3cb8486d8ddf473
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Wed Apr 1 18:02:42 2020 -0700

    [BEAM-9667] Allow metrics in DoFn Setup
---
 sdks/go/pkg/beam/core/runtime/exec/pardo.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index aba5d43..14f870f 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -75,7 +75,11 @@ func (n *ParDo) Up(ctx context.Context) error {
 	n.status = Up
 	n.inv = newInvoker(n.Fn.ProcessElementFn())
 
-	if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != nil {
+	// We can't cache the context during Setup since it runs only once per bundle.
+	// Subsequent bundles might run this same node, and the context here would be 
+	// incorrectly refering to the older bundleId.
+	setupCtx :=  metrics.SetPTransformID(ctx, n.PID)
+	if _, err := InvokeWithoutEventTime(setupCtx, n.Fn.SetupFn(), nil); err != nil {
 		return n.fail(err)
 	}