You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "camphillips22 (via GitHub)" <gi...@apache.org> on 2023/04/13 13:57:48 UTC
[GitHub] [beam] camphillips22 commented on issue #26245: [Bug][Go SDK]: Custom Splittable DoFn causes errors on runner split
camphillips22 commented on issue #26245:
URL: https://github.com/apache/beam/issues/26245#issuecomment-1507013684
Runner: Google Dataflow
Go SDK version: 2.46.0
Example code:
```golang
func GenerateData(s beam.Scope, start, end time.Time, col beam.PCollection) beam.PCollection {
s = s.Scope("GenerateData")
return beam.ParDo(s, &pageIdGeneratorFn{
Start: start.Format(time.RFC3339),
End: end.Format(time.RFC3339),
}, col)
}
type dataGeneratorFn struct {
Start string
End string
}
func (f *dataGeneratorFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
s, _ := time.Parse(time.RFC3339Nano, f.Start)
e, _ := time.Parse(time.RFC3339Nano, f.End)
return offsetrange.Restriction{
Start: s.Unix(),
End: e.Unix(),
}
}
func (f *dataGeneratorFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}
func (f *dataGeneratorFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64 {
return rest.Size()
}
func (f *dataGeneratorFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
rest.SizedSplits(int64(time.Hour.Seconds()))
}
func (f *dataGeneratorFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, _ []byte, emit func([]byte)) (sdf.ProcessContinuation, error) {
rest := rt.GetRestriction().(offsetrange.Restriction)
start := time.Unix(rest.Start, 0)
end := time.Unix(rest.End, 0)
// iterator may take a while to set up. Claim the start before setting up since dataflow aggressively splits.
if !rt.TryClaim(rest.Start) {
return sdf.StopProcessing(), nil
}
lastClaim := rest.Start
// setup iterator that takes in time bounds
iter := GetIterator(ctx, start, end)
for {
v, err := iter.Next()
if err != nil {
if err == io.EOF {
rt.TryClaim(rest.End)
return sdf.StopProcessing(), nil
}
return sdf.ResumeProcessingIn(5 * time.Second), err
}
// Note: this is simplified to remove tracking if the value's timestamp has already been
if v.Timestamp() != lastClaim {
if !rt.TryClaim(v.Timestamp()) {
return sdf.StopProcessing(), nil
}
lastClaim = v.Timestamp()
}
emit(v.Data())
}
}
```
As far as passing tests, I think that even with the error, I'm getting correct results, but the volume of errors is a bit concerning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org