You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/02/15 14:50:26 UTC
[beam] branch master updated: [Go SDK] add retries to connect with expansion service (#25237)
This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 52b604076b8 [Go SDK] add retries to connect with expansion service (#25237)
52b604076b8 is described below
commit 52b604076b8e82e8453df5b3cfbda1ab78090e3c
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Wed Feb 15 09:50:14 2023 -0500
[Go SDK] add retries to connect with expansion service (#25237)
* add retries to connect with expansion service
* prevent override of res
* change retries
* remove unnecessary module
* Revert "remove unnecessary module"
This reverts commit 15081850699f29a895e0f27899431c60409a617f.
* remove unnecessary pkg
---
sdks/go/pkg/beam/core/runtime/xlangx/expand.go | 33 ++++++++++++++++++++++----
1 file changed, 28 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index 9076b93e1f8..1a3040575c1 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strings"
+ "time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
@@ -32,8 +33,13 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang"
"google.golang.org/grpc"
+ "gopkg.in/retry.v1"
)
+// maxRetries is the maximum number of retries to attempt connecting to
+// an expansion service endpoint.
+const maxRetries = 5
+
// Expand expands an unexpanded graph.ExternalTransform as a
// graph.ExpandedTransform and assigns it to the ExternalTransform's Expanded
// field. This requires querying an expansion service based on the configuration
@@ -163,16 +169,33 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
client := jobpb.NewExpansionServiceClient(conn)
// Handling ExpansionResponse
- res, err := client.Expand(ctx, req)
- if err != nil {
- err = errors.Wrapf(err, "expansion failed")
- return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
+ strategy := retry.LimitCount(
+ maxRetries,
+ retry.Exponential{
+ Initial: time.Second,
+ Factor: 2,
+ },
+ )
+ var res *jobpb.ExpansionResponse
+ for attempt := retry.Start(strategy, nil); attempt.Next(); {
+ res, err = client.Expand(ctx, req)
+ if err == nil {
+ break
+ }
+
+ if attempt.Count() == maxRetries {
+ if err != nil {
+ err = errors.Wrap(err, "expansion failed")
+ return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
+ }
+ }
}
if len(res.GetError()) != 0 { // ExpansionResponse includes an error.
err := errors.New(res.GetError())
- err = errors.Wrapf(err, "expansion failed")
+ err = errors.Wrap(err, "expansion response error")
return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
}
+
return res, nil
}