You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/12 02:01:45 UTC
[beam] 01/01: [Go SDK]: SingleFlight bundle descriptor requests
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch singleflight23335
in repository https://gitbox.apache.org/repos/asf/beam.git
commit c42efb69dd07c1ce205d4189466e98b1d4ca72b3
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Tue Oct 11 19:01:26 2022 -0700
[Go SDK]: SingleFlight bundle descriptor requests
---
sdks/go.mod | 2 +-
sdks/go.sum | 3 ++-
sdks/go/pkg/beam/core/runtime/harness/harness.go | 12 +++++++++---
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git a/sdks/go.mod b/sdks/go.mod
index 7abbfef113c..6c14f06198a 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -43,6 +43,7 @@ require (
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
golang.org/x/net v0.0.0-20220909164309-bea034e7d591
golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1
+ golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
golang.org/x/text v0.3.7
google.golang.org/api v0.98.0
@@ -89,7 +90,6 @@ require (
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.opencensus.io v0.23.0 // indirect
- golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 4fa9bcafc52..53168bd246d 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -1213,8 +1213,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index edc16578a7f..3442bd8da12 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -38,6 +38,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
+ "golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/durationpb"
)
@@ -101,10 +102,15 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
client := fnpb.NewBeamFnControlClient(conn)
+ var bundleGetGroup singleflight.Group
lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
- pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
- log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
- return pbd, err
+ pbd, err, _ := bundleGetGroup.Do(string(id), func() (any, error) {
+ return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+ })
+ if err != nil {
+ return nil, err
+ }
+ return pbd.(*fnpb.ProcessBundleDescriptor), nil
}
stub, err := client.Control(ctx)