You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/02/15 14:50:26 UTC

[beam] branch master updated: [Go SDK] add retries to connect with expansion service (#25237)

This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b604076b8 [Go SDK] add retries to connect with expansion service (#25237)
52b604076b8 is described below

commit 52b604076b8e82e8453df5b3cfbda1ab78090e3c
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Wed Feb 15 09:50:14 2023 -0500

    [Go SDK] add retries to connect with expansion service (#25237)
    
    * add retries to connect with expansion service
    
    * prevent override of res
    
    * change retries
    
    * remove unnecessary module
    
    * Revert "remove unnecessary module"
    
    This reverts commit 15081850699f29a895e0f27899431c60409a617f.
    
    * remove unnecessary pkg
---
 sdks/go/pkg/beam/core/runtime/xlangx/expand.go | 33 ++++++++++++++++++++++----
 1 file changed, 28 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
index 9076b93e1f8..1a3040575c1 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expand.go
@@ -21,6 +21,7 @@ import (
 	"context"
 	"fmt"
 	"strings"
+	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
@@ -32,8 +33,13 @@ import (
 	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/xlang"
 	"google.golang.org/grpc"
+	"gopkg.in/retry.v1"
 )
 
+// maxRetries is the maximum number of retries to attempt connecting to
+// an expansion service endpoint.
+const maxRetries = 5
+
 // Expand expands an unexpanded graph.ExternalTransform as a
 // graph.ExpandedTransform and assigns it to the ExternalTransform's Expanded
 // field. This requires querying an expansion service based on the configuration
@@ -163,16 +169,33 @@ func QueryExpansionService(ctx context.Context, p *HandlerParams) (*jobpb.Expans
 	client := jobpb.NewExpansionServiceClient(conn)
 
 	// Handling ExpansionResponse
-	res, err := client.Expand(ctx, req)
-	if err != nil {
-		err = errors.Wrapf(err, "expansion failed")
-		return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
+	strategy := retry.LimitCount(
+		maxRetries,
+		retry.Exponential{
+			Initial: time.Second,
+			Factor:  2,
+		},
+	)
+	var res *jobpb.ExpansionResponse
+	for attempt := retry.Start(strategy, nil); attempt.Next(); {
+		res, err = client.Expand(ctx, req)
+		if err == nil {
+			break
+		}
+
+		if attempt.Count() == maxRetries {
+			if err != nil {
+				err = errors.Wrap(err, "expansion failed")
+				return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
+			}
+		}
 	}
 	if len(res.GetError()) != 0 { // ExpansionResponse includes an error.
 		err := errors.New(res.GetError())
-		err = errors.Wrapf(err, "expansion failed")
+		err = errors.Wrap(err, "expansion response error")
 		return nil, errors.WithContextf(err, "expanding transform with ExpansionRequest: %v", req)
 	}
+
 	return res, nil
 }