You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/02 23:01:04 UTC

[beam] branch master updated: [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 095190d5bcb [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)
095190d5bcb is described below

commit 095190d5bcb9985065d6a13b3e934cbc96f45637
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon May 2 19:00:58 2022 -0400

    [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)
---
 CHANGES.md                                         |  3 +-
 sdks/go/examples/snippets/04transforms.go          | 61 ++++++++++++++++++++++
 .../content/en/documentation/programming-guide.md  |  2 +-
 3 files changed, 63 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 9c2c465e67e..b30632a5755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -54,8 +54,7 @@
 
 ## Highlights
 
-* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+* Watermark estimation is now supported in the Go SDK ([BEAM-11105](https://issues.apache.org/jira/browse/BEAM-11105))
 
 ## I/Os
 
diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 42a13bd5426..80ff7ed66aa 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
 )
 
@@ -91,6 +92,8 @@ func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
 
 type splittableDoFn struct{}
 
+type weDoFn struct{}
+
 // [START bundlefinalization_simplecallback]
 
 func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalization) {
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalizat
 
 // [END bundlefinalization_simplecallback]
 
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+	Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator, sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+	state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+	return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+	e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest offsetrange.Restriction, element string) WatermarkState {
+	// Return some watermark state
+	return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) *CustomWatermarkEstimator {
+	return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state used to resume future watermark estimation
+// after a checkpoint/split. It is required if InitialWatermarkEstimatorState is defined,
+// otherwise it must not be defined.
+func (fn *weDoFn) WatermarkEstimatorState(e *CustomWatermarkEstimator) WatermarkState {
+	return e.state
+}
+
+// ProcessElement is the method to execute for each element.
+// It can optionally take in a watermark estimator.
+func (fn *weDoFn) ProcessElement(e *CustomWatermarkEstimator, element string) {
+	// ...
+	e.state.Watermark = time.Now()
+}
+
+// [END watermarkestimation_customestimator]
+
 // [START cogroupbykey_output_helpers]
 
 func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) string {
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index fccf4b98b37..5515d3a3f30 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -6507,7 +6507,7 @@ watermark estimator implementation. You can also provide your own watermark esti
 {{< /highlight >}}
 
 {{< highlight go >}}
-This is not supported yet, see BEAM-11105.
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" watermarkestimation_customestimator >}}
 {{< /highlight >}}
 
 ### 12.6. Truncating during drain {#truncating-during-drain}