You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 16:59:12 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #17956: [BEAM-11104] Add code snippet for Go SDK Self-Checkpointing

damccorm commented on code in PR #17956:
URL: https://github.com/apache/beam/pull/17956#discussion_r889155710


##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation {

Review Comment:
   Can we put this in the snippets folder (example below in Watermark estimation section)? I know we haven't been clean on that before, but it:
   
   (a) makes sure that the code actually compiles
   (b) makes it easier to reuse (e.g. I know Dataflow has docs that use snippets from Beam)



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation {
+  position := rt.GetRestriction().(offsetrange.Restriction).Start
+  for {
+    records, err := fn.ExternalService.readNextRecords(position)
+    if err == fn.ExternalService.ThrottlingErr {
+      return sdf.ResumeProcessingIn(60 * time.Seconds)
+    }
+    if len(records) == 0 {
+      return sdf.ResumeProcessingIn(10 * time.Seconds)

Review Comment:
   Maybe add a comment along the lines of `// Wait for data to be available`? Might be nice to have a similar comment for the throttling case and the finish execution case as well.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation {

Review Comment:
   Could you return an `err` parameter as well (it can just return nil)? Something I realized w/ Bundle Finalization is that its much more helpful if we provide the parameters that surround the one we are demonstrating because it allows users to see the ordering we require.
   
   Side note unrelated to this PR: We probably need better ordering error messages, they are pretty confusing right now.



##########
website/www/site/content/en/documentation/programming-guide.md:
##########
@@ -6422,7 +6422,26 @@ resource utilization.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11104.
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, emit func(Record)) sdf.ProcessContinuation {

Review Comment:
   I'm actually also curious about what process continuation we should return when we return an err response actually - is it nil? Might be worth including that as an option if for example, `records, err := fn.ExternalService.readNextRecords(position)` returns a non-nil, non-throttling error respone



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org