You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/02 23:01:04 UTC
[beam] branch master updated: [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 095190d5bcb [BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)
095190d5bcb is described below
commit 095190d5bcb9985065d6a13b3e934cbc96f45637
Author: Danny McCormick <da...@google.com>
AuthorDate: Mon May 2 19:00:58 2022 -0400
[BEAM-11105] Add docs + CHANGES.md entry for Go Watermark Estimation (#17522)
---
CHANGES.md | 3 +-
sdks/go/examples/snippets/04transforms.go | 61 ++++++++++++++++++++++
.../content/en/documentation/programming-guide.md | 2 +-
3 files changed, 63 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 9c2c465e67e..b30632a5755 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -54,8 +54,7 @@
## Highlights
-* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
-* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
+* Watermark estimation is now supported in the Go SDK ([BEAM-11105](https://issues.apache.org/jira/browse/BEAM-11105))
## I/Os
diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 42a13bd5426..80ff7ed66aa 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
)
@@ -91,6 +92,8 @@ func CreateAndSplit(s beam.Scope, input []stringPair) beam.PCollection {
type splittableDoFn struct{}
+type weDoFn struct{}
+
// [START bundlefinalization_simplecallback]
func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalization) {
@@ -105,6 +108,64 @@ func (fn *splittableDoFn) ProcessElement(element string, bf beam.BundleFinalizat
// [END bundlefinalization_simplecallback]
+// [START watermarkestimation_customestimator]
+
+// WatermarkState is a custom type.`
+//
+// It is optional to write your own state type when making a custom estimator.
+type WatermarkState struct {
+ Watermark time.Time
+}
+
+// CustomWatermarkEstimator is a custom watermark estimator.
+// You may use any type here, including some of Beam's built in watermark estimator types,
+// e.g. sdf.WallTimeWatermarkEstimator, sdf.TimestampObservingWatermarkEstimator, and sdf.ManualWatermarkEstimator
+type CustomWatermarkEstimator struct {
+ state WatermarkState
+}
+
+// CurrentWatermark returns the current watermark and is invoked on DoFn splits and self-checkpoints.
+// Watermark estimators must implement CurrentWatermark() time.Time
+func (e *CustomWatermarkEstimator) CurrentWatermark() time.Time {
+ return e.state.Watermark
+}
+
+// ObserveTimestamp is called on the output timestamps of all
+// emitted elements to update the watermark. It is optional
+func (e *CustomWatermarkEstimator) ObserveTimestamp(ts time.Time) {
+ e.state.Watermark = ts
+}
+
+// InitialWatermarkEstimatorState defines an initial state used to initialize the watermark
+// estimator. It is optional. If this is not defined, WatermarkEstimatorState may not be
+// defined and CreateWatermarkEstimator must not take in parameters.
+func (fn *weDoFn) InitialWatermarkEstimatorState(et beam.EventTime, rest offsetrange.Restriction, element string) WatermarkState {
+ // Return some watermark state
+ return WatermarkState{Watermark: time.Now()}
+}
+
+// CreateWatermarkEstimator creates the watermark estimator used by this Splittable DoFn.
+// Must take in a state parameter if InitialWatermarkEstimatorState is defined, otherwise takes no parameters.
+func (fn *weDoFn) CreateWatermarkEstimator(initialState WatermarkState) *CustomWatermarkEstimator {
+ return &CustomWatermarkEstimator{state: initialState}
+}
+
+// WatermarkEstimatorState returns the state used to resume future watermark estimation
+// after a checkpoint/split. It is required if InitialWatermarkEstimatorState is defined,
+// otherwise it must not be defined.
+func (fn *weDoFn) WatermarkEstimatorState(e *CustomWatermarkEstimator) WatermarkState {
+ return e.state
+}
+
+// ProcessElement is the method to execute for each element.
+// It can optionally take in a watermark estimator.
+func (fn *weDoFn) ProcessElement(e *CustomWatermarkEstimator, element string) {
+ // ...
+ e.state.Watermark = time.Now()
+}
+
+// [END watermarkestimation_customestimator]
+
// [START cogroupbykey_output_helpers]
func formatCoGBKResults(key string, emailIter, phoneIter func(*string) bool) string {
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index fccf4b98b37..5515d3a3f30 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -6507,7 +6507,7 @@ watermark estimator implementation. You can also provide your own watermark esti
{{< /highlight >}}
{{< highlight go >}}
-This is not supported yet, see BEAM-11105.
+{{< code_sample "sdks/go/examples/snippets/04transforms.go" watermarkestimation_customestimator >}}
{{< /highlight >}}
### 12.6. Truncating during drain {#truncating-during-drain}