You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/02/21 20:28:09 UTC

[beam] branch master updated: Use context param and error return value in mongodbio.Read SDF methods (#25536)

This is an automated email from the ASF dual-hosted git repository.

riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e80596f5d41 Use context param and error return value in mongodbio.Read SDF methods (#25536)
e80596f5d41 is described below

commit e80596f5d418d5c7608e05f32e0b309c912749bf
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Tue Feb 21 21:28:01 2023 +0100

    Use context param and error return value in mongodbio.Read SDF methods (#25536)
---
 sdks/go/pkg/beam/io/mongodbio/read.go | 29 +++++++++++++++++------------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go b/sdks/go/pkg/beam/io/mongodbio/read.go
index 59d8cf6aef9..101d1f4af89 100644
--- a/sdks/go/pkg/beam/io/mongodbio/read.go
+++ b/sdks/go/pkg/beam/io/mongodbio/read.go
@@ -159,10 +159,12 @@ func inferProjection(t reflect.Type, tagKey string) bson.D {
 	return projection
 }
 
-func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
-	ctx := context.Background()
+func (fn *readFn) CreateInitialRestriction(
+	ctx context.Context,
+	_ []byte,
+) (idRangeRestriction, error) {
 	if err := fn.Setup(ctx); err != nil {
-		panic(err)
+		return idRangeRestriction{}, err
 	}
 
 	outerRange, err := findOuterIDRange(ctx, fn.collection, fn.filter)
@@ -174,10 +176,10 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
 				fn.Database,
 				fn.Collection,
 			)
-			return idRangeRestriction{}
+			return idRangeRestriction{}, nil
 		}
 
-		panic(err)
+		return idRangeRestriction{}, err
 	}
 
 	return newIDRangeRestriction(
@@ -185,7 +187,7 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
 		fn.collection,
 		outerRange,
 		fn.filter,
-	)
+	), nil
 }
 
 func findOuterIDRange(
@@ -213,22 +215,25 @@ func findOuterIDRange(
 	return outerRange, nil
 }
 
-func (fn *readFn) SplitRestriction(_ []byte, rest idRangeRestriction) []idRangeRestriction {
+func (fn *readFn) SplitRestriction(
+	ctx context.Context,
+	_ []byte,
+	rest idRangeRestriction,
+) ([]idRangeRestriction, error) {
 	if rest.Count == 0 {
-		return []idRangeRestriction{rest}
+		return []idRangeRestriction{rest}, nil
 	}
 
-	ctx := context.Background()
 	if err := fn.Setup(ctx); err != nil {
-		panic(err)
+		return nil, err
 	}
 
 	splits, err := rest.SizedSplits(ctx, fn.collection, fn.BundleSize, fn.BucketAuto)
 	if err != nil {
-		panic(err)
+		return nil, err
 	}
 
-	return splits
+	return splits, nil
 }
 
 func (fn *readFn) CreateTracker(rest idRangeRestriction) *sdf.LockRTracker {