You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ri...@apache.org on 2023/02/21 20:28:09 UTC
[beam] branch master updated: Use context param and error return value in mongodbio.Read SDF methods (#25536)
This is an automated email from the ASF dual-hosted git repository.
riteshghorse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e80596f5d41 Use context param and error return value in mongodbio.Read SDF methods (#25536)
e80596f5d41 is described below
commit e80596f5d418d5c7608e05f32e0b309c912749bf
Author: Johanna Öjeling <51...@users.noreply.github.com>
AuthorDate: Tue Feb 21 21:28:01 2023 +0100
Use context param and error return value in mongodbio.Read SDF methods (#25536)
---
sdks/go/pkg/beam/io/mongodbio/read.go | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/sdks/go/pkg/beam/io/mongodbio/read.go b/sdks/go/pkg/beam/io/mongodbio/read.go
index 59d8cf6aef9..101d1f4af89 100644
--- a/sdks/go/pkg/beam/io/mongodbio/read.go
+++ b/sdks/go/pkg/beam/io/mongodbio/read.go
@@ -159,10 +159,12 @@ func inferProjection(t reflect.Type, tagKey string) bson.D {
return projection
}
-func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
- ctx := context.Background()
+func (fn *readFn) CreateInitialRestriction(
+ ctx context.Context,
+ _ []byte,
+) (idRangeRestriction, error) {
if err := fn.Setup(ctx); err != nil {
- panic(err)
+ return idRangeRestriction{}, err
}
outerRange, err := findOuterIDRange(ctx, fn.collection, fn.filter)
@@ -174,10 +176,10 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
fn.Database,
fn.Collection,
)
- return idRangeRestriction{}
+ return idRangeRestriction{}, nil
}
- panic(err)
+ return idRangeRestriction{}, err
}
return newIDRangeRestriction(
@@ -185,7 +187,7 @@ func (fn *readFn) CreateInitialRestriction(_ []byte) idRangeRestriction {
fn.collection,
outerRange,
fn.filter,
- )
+ ), nil
}
func findOuterIDRange(
@@ -213,22 +215,25 @@ func findOuterIDRange(
return outerRange, nil
}
-func (fn *readFn) SplitRestriction(_ []byte, rest idRangeRestriction) []idRangeRestriction {
+func (fn *readFn) SplitRestriction(
+ ctx context.Context,
+ _ []byte,
+ rest idRangeRestriction,
+) ([]idRangeRestriction, error) {
if rest.Count == 0 {
- return []idRangeRestriction{rest}
+ return []idRangeRestriction{rest}, nil
}
- ctx := context.Background()
if err := fn.Setup(ctx); err != nil {
- panic(err)
+ return nil, err
}
splits, err := rest.SizedSplits(ctx, fn.collection, fn.BundleSize, fn.BucketAuto)
if err != nil {
- panic(err)
+ return nil, err
}
- return splits
+ return splits, nil
}
func (fn *readFn) CreateTracker(rest idRangeRestriction) *sdf.LockRTracker {