You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/12 02:01:45 UTC

[beam] 01/01: [Go SDK]: SingleFlight bundle descriptor requests

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch singleflight23335
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c42efb69dd07c1ce205d4189466e98b1d4ca72b3
Author: lostluck <13...@users.noreply.github.com>
AuthorDate: Tue Oct 11 19:01:26 2022 -0700

    [Go SDK]: SingleFlight bundle descriptor requests
---
 sdks/go.mod                                      |  2 +-
 sdks/go.sum                                      |  3 ++-
 sdks/go/pkg/beam/core/runtime/harness/harness.go | 12 +++++++++---
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/sdks/go.mod b/sdks/go.mod
index 7abbfef113c..6c14f06198a 100644
--- a/sdks/go.mod
+++ b/sdks/go.mod
@@ -43,6 +43,7 @@ require (
 	github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
 	golang.org/x/net v0.0.0-20220909164309-bea034e7d591
 	golang.org/x/oauth2 v0.0.0-20220909003341-f21342109be1
+	golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0
 	golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10
 	golang.org/x/text v0.3.7
 	google.golang.org/api v0.98.0
@@ -89,7 +90,6 @@ require (
 	github.com/sirupsen/logrus v1.8.1 // indirect
 	github.com/spf13/pflag v1.0.5 // indirect
 	go.opencensus.io v0.23.0 // indirect
-	golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
 	golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
diff --git a/sdks/go.sum b/sdks/go.sum
index 4fa9bcafc52..53168bd246d 100644
--- a/sdks/go.sum
+++ b/sdks/go.sum
@@ -1213,8 +1213,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0 h1:cu5kTvlzcw1Q5S9f5ip1/cpiB4nXvw1XYzFPGgzLUOY=
+golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index edc16578a7f..3442bd8da12 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -38,6 +38,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/diagnostics"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/util/grpcx"
 	"github.com/golang/protobuf/proto"
+	"golang.org/x/sync/singleflight"
 	"google.golang.org/grpc"
 	"google.golang.org/protobuf/types/known/durationpb"
 )
@@ -101,10 +102,15 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
 
 	client := fnpb.NewBeamFnControlClient(conn)
 
+	var bundleGetGroup singleflight.Group
 	lookupDesc := func(id bundleDescriptorID) (*fnpb.ProcessBundleDescriptor, error) {
-		pbd, err := client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
-		log.Debugf(ctx, "GPBD RESP [%v]: %v, err %v", id, pbd, err)
-		return pbd, err
+		pbd, err, _ := bundleGetGroup.Do(string(id), func() (any, error) {
+			return client.GetProcessBundleDescriptor(ctx, &fnpb.GetProcessBundleDescriptorRequest{ProcessBundleDescriptorId: string(id)})
+		})
+		if err != nil {
+			return nil, err
+		}
+		return pbd.(*fnpb.ProcessBundleDescriptor), nil
 	}
 
 	stub, err := client.Control(ctx)