You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/06/02 22:08:17 UTC

[beam] branch master updated: [BEAM-11106] documentation for SDF truncation in Go (#17781)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 937e22ffcb5 [BEAM-11106] documentation for SDF truncation in Go (#17781)
937e22ffcb5 is described below

commit 937e22ffcb56c0722b558e7c5c1c30fda5116be8
Author: Ritesh Ghorse <ri...@gmail.com>
AuthorDate: Thu Jun 2 18:08:10 2022 -0400

    [BEAM-11106] documentation for SDF truncation in Go (#17781)
    
    * documentation for sdf truncation
    
    * update import
    
    * update snippet
---
 sdks/go/examples/snippets/04transforms.go               | 17 +++++++++++++++++
 .../site/content/en/documentation/programming-guide.md  |  7 ++++++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 3cb83b643c9..2c1b59c9ad5 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -167,6 +167,23 @@ func (fn *weDoFn) ProcessElement(e *CustomWatermarkEstimator, element string) {
 
 // [END watermarkestimation_customestimator]
 
+// [START sdf_truncate]
+
+// TruncateRestriction is a transform that is triggered when pipeline starts to drain. It helps to finish a
+// pipeline quicker by truncating the restriction.
+func (fn *splittableDoFn) TruncateRestriction(rt *sdf.LockRTracker, element string) offsetrange.Restriction {
+	start := rt.GetRestriction().(offsetrange.Restriction).Start
+	prevEnd := rt.GetRestriction().(offsetrange.Restriction).End
+	// truncate the restriction by half.
+	newEnd := prevEnd / 2
+	return offsetrange.Restriction{
+		Start: start,
+		End:   newEnd,
+	}
+}
+
+// [END sdf_truncate]
+
 // [START cogroupbykey_output_helpers]
 
 func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) string {
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 310902698eb..ec2f4fda88a 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -6524,6 +6524,11 @@ unbounded restrictions finish processing at the next SDF-initiated checkpoint or
 You are able to override this default behavior by defining the appropriate method on the restriction
 provider.
 
+{{< paragraph class="language-go" >}}
+Note: Once the pipeline drain starts and truncate restriction transform is triggered, the `sdf.ProcessContinuation`
+will not be rescheduled.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" SDF_Truncate >}}
 {{< /highlight >}}
@@ -6533,7 +6538,7 @@ provider.
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11106.
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" sdf_truncate >}}
 {{< /highlight >}}
 
 ### 12.7. Bundle finalization {#bundle-finalization}