You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "camphillips22 (via GitHub)" <gi...@apache.org> on 2023/04/13 13:57:48 UTC

[GitHub] [beam] camphillips22 commented on issue #26245: [Bug][Go SDK]: Custom Splittable DoFn causes errors on runner split

camphillips22 commented on issue #26245:
URL: https://github.com/apache/beam/issues/26245#issuecomment-1507013684

   Runner: Google Dataflow
   Go SDK version: 2.46.0
   
   Example code:
   
   ```golang
   
   func GenerateData(s beam.Scope, start, end time.Time, col beam.PCollection) beam.PCollection {
   	s = s.Scope("GenerateData")
   	return beam.ParDo(s, &pageIdGeneratorFn{
   		Start: start.Format(time.RFC3339),
   		End:   end.Format(time.RFC3339),
   	}, col)
   }
   
   type dataGeneratorFn struct {
   	Start string
   	End   string
   }
   
   func (f *dataGeneratorFn) CreateInitialRestriction(_ []byte) offsetrange.Restriction {
   	s, _ := time.Parse(time.RFC3339Nano, f.Start)
   	e, _ := time.Parse(time.RFC3339Nano, f.End)
   	return offsetrange.Restriction{
   		Start: s.Unix(),
   		End:   e.Unix(),
   	}
   }
   
   func (f *dataGeneratorFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
   	return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
   }
   
   func (f *dataGeneratorFn) RestrictionSize(_ []byte, rest offsetrange.Restriction) float64 {
   	return rest.Size()
   }
   
   func (f *dataGeneratorFn) SplitRestriction(_ []byte, rest offsetrange.Restriction) (splits []offsetrange.Restriction) {
           rest.SizedSplits(int64(time.Hour.Seconds()))
   }
   
   func (f *dataGeneratorFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, _ []byte, emit func([]byte)) (sdf.ProcessContinuation, error) {
   
           rest := rt.GetRestriction().(offsetrange.Restriction)
   	start := time.Unix(rest.Start, 0)
   	end := time.Unix(rest.End, 0)
           
           // iterator may take a while to set up. Claim the start before setting up since dataflow aggressively splits.
           if !rt.TryClaim(rest.Start) {
                  return sdf.StopProcessing(), nil
           }
           lastClaim := rest.Start
   
           // setup iterator that takes in time bounds
           iter := GetIterator(ctx, start, end)
   	for {
   		v, err := iter.Next()
   
   		if err != nil {
   			if err == io.EOF {
                                    rt.TryClaim(rest.End)
   				return sdf.StopProcessing(), nil
   			}
   			return sdf.ResumeProcessingIn(5 * time.Second), err
   		}
                   // Note: this is simplified to remove tracking if the value's timestamp has already been
                   if v.Timestamp() != lastClaim {
   	                if !rt.TryClaim(v.Timestamp()) {
   		                return sdf.StopProcessing(), nil
   	                }
                           lastClaim = v.Timestamp()
                   }
   		emit(v.Data())
   	}
   }
   
   ```
   
   As far as passing tests, I think that even with the error, I'm getting correct results, but the volume of errors is a bit concerning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org