You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/12 02:01:44 UTC

[beam] branch singleflight23335 created (now c42efb69dd0)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a change to branch singleflight23335
in repository https://gitbox.apache.org/repos/asf/beam.git


      at c42efb69dd0 [Go SDK]: SingleFlight bundle descriptor requests

This branch includes the following new commits:

     new c42efb69dd0 [Go SDK]: SingleFlight bundle descriptor requests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [Go SDK]: SingleFlight bundle descriptor requests

Posted by lo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch singleflight23335
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c42efb69dd07c1ce205d4189466e98b1d4ca72b3
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Tue Oct 11 19:01:26 2022 -0700

    [Go SDK]: SingleFlight bundle descriptor requests
---
 sdks/go.mod                                      |  2 +-
 sdks/go.sum                                      |  3 ++-
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 12 +++++++++---
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/sdks/go.mod b/sdks/go.mod
index 7abbfef113c..6c14f06198a 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -43,6 +43,7 @@ require (
 	github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
 	golang.org/x/net v0.0.0-20220909164309-bea034e7d591
 	golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1
+	golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
 	golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
 	golang.org/x/text v0.3.7
 	google.golang.org/api v0.98.0
@@ -89,7 +90,6 @@ require (
 	github.com/sirupsen/logrus v1.8.1 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 	go.opencensus.io v0.23.0 // indirect
-	golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
 	golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 4fa9bcafc52..53168bd246d 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -1213,8 +1213,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index edc16578a7f..3442bd8da12 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -38,6 +38,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
 	"github.com/golang/protobuf/proto"
+	"golang.org/x/sync/singleflight"
 	"google.golang.org/grpc"
 	"google.golang.org/protobuf/types/known/durationpb"
 )
@@ -101,10 +102,15 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 
 	client := fnpb.NewBeamFnControlClient(conn)
 
+	var bundleGetGroup singleflight.Group
 	lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
-		pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
-		log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
-		return pbd, err
+		pbd, err, _ := bundleGetGroup.Do(string(id), func() (any, error) {
+			return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+		})
+		if err != nil {
+			return nil, err
+		}
+		return pbd.(*fnpb.ProcessBundleDescriptor), nil
 	}
 
 	stub, err := client.Control(ctx)