You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/04 17:18:41 UTC

[beam] branch master updated: [BEAM-3304, BEAM-12513] Trigger changes and Windowing. (#15644)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a3d887  [BEAM-3304, BEAM-12513] Trigger changes and Windowing. (#15644)
2a3d887 is described below

commit 2a3d88751f534aceadeccd57130ea7f70aaf7e20
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Mon Oct 4 10:17:41 2021 -0700

    [BEAM-3304, BEAM-12513] Trigger changes and Windowing. (#15644)
---
 sdks/go/examples/snippets/08windowing.go           |  94 +++++++++++++++
 sdks/go/examples/snippets/09triggers.go            |  81 ++++++++++---
 sdks/go/pkg/beam/core/graph/window/trigger.go      | 129 +++++++++++++++++----
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  34 ++++--
 sdks/go/pkg/beam/option.go                         |   6 +-
 sdks/go/pkg/beam/windowing.go                      |  59 +++++++---
 sdks/go/test/integration/primitives/windowinto.go  |  46 ++++++--
 .../content/en/documentation/programming-guide.md  |  72 ++++++++++--
 8 files changed, 430 insertions(+), 91 deletions(-)

diff --git a/sdks/go/examples/snippets/08windowing.go b/sdks/go/examples/snippets/08windowing.go
new file mode 100644
index 0000000..8e8e1b6
--- /dev/null
+++ b/sdks/go/examples/snippets/08windowing.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+)
+
+func settingWindows(s beam.Scope, items beam.PCollection) {
+	// [START setting_fixed_windows]
+	fixedWindowedItems := beam.WindowInto(s,
+		window.NewFixedWindows(60*time.Second),
+		items)
+	// [END setting_fixed_windows]
+
+	// [START setting_sliding_windows]
+	slidingWindowedItems := beam.WindowInto(s,
+		window.NewSlidingWindows(5*time.Second, 30*time.Second),
+		items)
+	// [END setting_sliding_windows]
+
+	// [START setting_session_windows]
+	sessionWindowedItems := beam.WindowInto(s,
+		window.NewSessions(600*time.Second),
+		items)
+	// [END setting_session_windows]
+
+	// [START setting_global_window]
+	globalWindowedItems := beam.WindowInto(s,
+		window.NewGlobalWindows(),
+		items)
+	// [END setting_global_window]
+
+	// [START setting_allowed_lateness]
+	windowedItems := beam.WindowInto(s,
+		window.NewFixedWindows(1*time.Minute), items,
+		beam.AllowedLateness(2*24*time.Hour), // 2 days
+	)
+	// [END setting_allowed_lateness]
+
+	_ = []beam.PCollection{
+		fixedWindowedItems,
+		slidingWindowedItems,
+		sessionWindowedItems,
+		globalWindowedItems,
+		windowedItems,
+	}
+}
+
+// LogEntry is a dummy type for documentation purposes.
+type LogEntry int
+
+func extractEventTime(LogEntry) time.Time {
+	// Note: Returning time.Now() is always going to be processing time
+	// not EventTime. For true event time, one needs to extract the
+	// time from the element itself.
+	return time.Now()
+}
+
+// [START setting_timestamp]
+
+// AddTimestampDoFn extracts an event time from a LogEntry.
+func AddTimestampDoFn(element LogEntry, emit func(beam.EventTime, LogEntry)) {
+	et := extractEventTime(element)
+	// Defining an emitter with beam.EventTime as the first parameter
+	// allows the DoFn to set the event time for the emitted element.
+	emit(mtime.FromTime(et), element)
+}
+
+// [END setting_timestamp]
+
+func timestampedCollection(s beam.Scope, unstampedLogs beam.PCollection) {
+	// [START setting_timestamp_pipeline]
+	stampedLogs := beam.ParDo(s, AddTimestampDoFn, unstampedLogs)
+	// [END setting_timestamp_pipeline]
+	_ = stampedLogs
+}
diff --git a/sdks/go/examples/snippets/09triggers.go b/sdks/go/examples/snippets/09triggers.go
index 37afe64..0af6d84 100644
--- a/sdks/go/examples/snippets/09triggers.go
+++ b/sdks/go/examples/snippets/09triggers.go
@@ -23,33 +23,76 @@
 package snippets
 
 import (
+	"time"
+
 	"github.com/apache/beam/sdks/v2/go/pkg/beam"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
-	"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
-	"time"
 )
 
-func generateStream(s beam.Scope) beam.PCollection {
-	con := teststream.NewConfig()
-	con.AddElements(1000, 1.0, 2.0, 3.0)
-	con.AdvanceWatermark(11000)
-	return teststream.Create(s, con)
-}
-
-func TriggerAfterEndOfWindow(s beam.Scope) {
-	pCollection := generateStream(s)
-	windowSize := 10 * time.Second
+func TriggerAfterEndOfWindow(s beam.Scope, pCollection beam.PCollection) {
 	// [START after_window_trigger]
-	trigger := window.TriggerAfterEndOfWindow().EarlyFiring(window.TriggerAfterProcessingTime(60000)).LateFiring(window.TriggerRepeat(window.TriggerAfterCount(1)))
+	trigger := window.TriggerAfterEndOfWindow().
+		EarlyFiring(window.TriggerAfterProcessingTime().
+			PlusDelay(60 * time.Second)).
+		LateFiring(window.TriggerRepeat(window.TriggerAfterCount(1)))
 	// [END after_window_trigger]
-	beam.WindowInto(s, window.NewFixedWindows(windowSize), pCollection, beam.Trigger(trigger), beam.PanesDiscard())
+	beam.WindowInto(s, window.NewFixedWindows(10*time.Second), pCollection, beam.Trigger(trigger), beam.PanesDiscard())
 }
 
-func TriggerAlways(s beam.Scope) {
-	pCollection := generateStream(s)
+func TriggerAlways(s beam.Scope, pCollection beam.PCollection) {
 	// [START always_trigger]
-	windowSize := 10 * time.Second
-	trigger := window.TriggerAlways()
-	beam.WindowInto(s, window.NewFixedWindows(windowSize), pCollection, beam.Trigger(trigger), beam.PanesDiscard())
+	beam.WindowInto(s, window.NewFixedWindows(10*time.Second), pCollection,
+		beam.Trigger(window.TriggerAlways()),
+		beam.PanesDiscard(),
+	)
 	// [END always_trigger]
 }
+
+func ComplexTriggers(s beam.Scope, pcollection beam.PCollection) {
+	// [START setting_a_trigger]
+	windowedItems := beam.WindowInto(s,
+		window.NewFixedWindows(1*time.Minute), pcollection,
+		beam.Trigger(window.TriggerAfterProcessingTime().
+			PlusDelay(1*time.Minute)),
+		beam.AllowedLateness(30*time.Minute),
+		beam.PanesDiscard(),
+	)
+	// [END setting_a_trigger]
+
+	// [START setting_allowed_lateness]
+	allowedToBeLateItems := beam.WindowInto(s,
+		window.NewFixedWindows(1*time.Minute), pcollection,
+		beam.Trigger(window.TriggerAfterProcessingTime().
+			PlusDelay(1*time.Minute)),
+		beam.AllowedLateness(30*time.Minute),
+	)
+	// [END setting_allowed_lateness]
+
+	// [START model_composite_triggers]
+	compositeTriggerItems := beam.WindowInto(s,
+		window.NewFixedWindows(1*time.Minute), pcollection,
+		beam.Trigger(window.TriggerAfterEndOfWindow().
+			LateFiring(window.TriggerAfterProcessingTime().
+				PlusDelay(10*time.Minute))),
+		beam.AllowedLateness(2*24*time.Hour),
+	)
+	// [END model_composite_triggers]
+
+	// TODO(BEAM-3304) AfterAny is not yet implemented.
+	// Implement so the following compiles when no longer commented out.
+
+	// [START other_composite_trigger]
+	// beam.Trigger(
+	// 	window.TriggerAfterAny(
+	// 		window.TriggerAfterCount(100),
+	// 		window.TriggerAfterProcessingTime().
+	// 			PlusDelay(1*time.Minute)),
+	// )
+	// [END other_composite_trigger]
+
+	_ = []beam.PCollection{
+		windowedItems,
+		allowedToBeLateItems,
+		compositeTriggerItems,
+	}
+}
diff --git a/sdks/go/pkg/beam/core/graph/window/trigger.go b/sdks/go/pkg/beam/core/graph/window/trigger.go
index 9068076..f465ff5 100644
--- a/sdks/go/pkg/beam/core/graph/window/trigger.go
+++ b/sdks/go/pkg/beam/core/graph/window/trigger.go
@@ -15,17 +15,54 @@
 
 package window
 
-import "fmt"
+import (
+	"fmt"
+	"time"
+)
 
+// Trigger describes when to emit new aggregations.
+// Fields are exported for use by the framework, and not intended
+// to be set by end users.
+//
+// This API is experimental and subject to change.
 type Trigger struct {
-	Kind         string
-	SubTriggers  []Trigger
-	Delay        int64 // in milliseconds
-	ElementCount int32
-	EarlyTrigger *Trigger
-	LateTrigger  *Trigger
+	Kind                string
+	SubTriggers         []Trigger            // Repeat, OrFinally, Any, All
+	TimestampTransforms []TimestampTransform // AfterProcessingTime
+	ElementCount        int32                // ElementCount
+	EarlyTrigger        *Trigger             // AfterEndOfWindow
+	LateTrigger         *Trigger             // AfterEndOfWindow
+}
+
+// TimestampTransform describes how an after processing time trigger
+// time is transformed to determine when to fire an aggregation.const
+// The base timestamp is always the when the first element of the pane
+// is received.
+//
+// A series of these transforms will be applied in order emit at regular intervals.
+type TimestampTransform interface {
+	timestampTransform()
+}
+
+// DelayTransform takes the timestamp and adds the given delay to it.
+type DelayTransform struct {
+	Delay int64 // in milliseconds
 }
 
+func (DelayTransform) timestampTransform() {}
+
+// AlignToTransform takes the timestamp and transforms it to the lowest
+// multiple of the period starting from the offset.
+//
+// Eg. A period of 20 with an offset of 45 would have alignments at 5,25,45,65 etc.
+// Timestamps would be transformed as follows: 0 to 5 would be transformed to 5,
+// 6 to 25 would be transformed to 25, 26 to 45 would be transformed to 45, and so on.
+type AlignToTransform struct {
+	Period, Offset int64 // in milliseconds
+}
+
+func (AlignToTransform) timestampTransform() {}
+
 const (
 	DefaultTrigger                         string = "Trigger_Default_"
 	AlwaysTrigger                          string = "Trigger_Always_"
@@ -46,49 +83,101 @@ func TriggerDefault() Trigger {
 	return Trigger{Kind: DefaultTrigger}
 }
 
-// TriggerAlways constructs an always trigger that keeps firing immediately after an element is processed.
+// TriggerAlways constructs a trigger that fires immediately
+// whenever an element is received.
+//
 // Equivalent to window.TriggerRepeat(window.TriggerAfterCount(1))
 func TriggerAlways() Trigger {
 	return Trigger{Kind: AlwaysTrigger}
 }
 
-// TriggerAfterCount constructs an element count trigger that fires after atleast `count` number of elements are processed.
+// TriggerAfterCount constructs a trigger that fires after
+// at least `count` number of elements are processed.
 func TriggerAfterCount(count int32) Trigger {
 	return Trigger{Kind: ElementCountTrigger, ElementCount: count}
 }
 
-// TriggerAfterProcessingTime constructs an after processing time trigger that fires after 'delay' milliseconds of processing time have passed.
-func TriggerAfterProcessingTime(delay int64) Trigger {
-	return Trigger{Kind: AfterProcessingTimeTrigger, Delay: delay}
+// TriggerAfterProcessingTime constructs a trigger that fires relative to
+// when input first arrives.
+//
+// Must be configured with calls to PlusDelay, or AlignedTo. May be
+// configured with additional delay.
+func TriggerAfterProcessingTime() Trigger {
+	return Trigger{Kind: AfterProcessingTimeTrigger}
 }
 
-// TriggerRepeat constructs a repeat trigger that fires a trigger repeatedly once the condition has been met.
+// PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay,
+// no smaller than a millisecond.
+func (tr Trigger) PlusDelay(delay time.Duration) Trigger {
+	if tr.Kind != AfterProcessingTimeTrigger {
+		panic(fmt.Errorf("can't apply processing delay to %s, want: AfterProcessingTimeTrigger", tr.Kind))
+	}
+	if delay < time.Millisecond {
+		panic(fmt.Errorf("can't apply processing delay of less than a millisecond. Got: %v", delay))
+	}
+	tr.TimestampTransforms = append(tr.TimestampTransforms, DelayTransform{Delay: int64(delay / time.Millisecond)})
+	return tr
+}
+
+// AlignedTo configures an AfterProcessingTime trigger to fire
+// at the smallest multiple of period since the offset greater than the first element timestamp.
+//
+// * Period may not be smaller than a millisecond.
+// * Offset may be a zero time (time.Time{}).
+func (tr Trigger) AlignedTo(period time.Duration, offset time.Time) Trigger {
+	if tr.Kind != AfterProcessingTimeTrigger {
+		panic(fmt.Errorf("can't apply processing delay to %s, want: AfterProcessingTimeTrigger", tr.Kind))
+	}
+	if period < time.Millisecond {
+		panic(fmt.Errorf("can't apply an alignment period of less than a millisecond. Got: %v", period))
+	}
+	offsetMillis := int64(0)
+	if !offset.IsZero() {
+		// TODO: Change to call UnixMilli() once we move to only supporting a go version > 1.17.
+		offsetMillis = offset.Unix()*1e3 + int64(offset.Nanosecond())/1e6
+	}
+	tr.TimestampTransforms = append(tr.TimestampTransforms, AlignToTransform{
+		Period: int64(period / time.Millisecond),
+		Offset: offsetMillis,
+	})
+	return tr
+}
+
+// TriggerRepeat constructs a trigger that fires a trigger repeatedly
+// once the condition has been met.
+//
 // Ex: window.TriggerRepeat(window.TriggerAfterCount(1)) is same as window.TriggerAlways().
 func TriggerRepeat(tr Trigger) Trigger {
 	return Trigger{Kind: RepeatTrigger, SubTriggers: []Trigger{tr}}
 }
 
-// TriggerAfterEndOfWindow constructs an end of window trigger that is configurable for early firing trigger(before the end of window)
-// and late firing trigger(after the end of window).
-// Default Options are: Default Trigger for EarlyFiring and No LateFiring. Override it with EarlyFiring and LateFiring methods on this trigger.
+// TriggerAfterEndOfWindow constructs a trigger that is configurable for early firing
+// (before the end of window) and late firing (after the end of window).
+//
+// Default Options are: Default Trigger for EarlyFiring and No LateFiring.
+// Override it with EarlyFiring and LateFiring methods on this trigger.
 func TriggerAfterEndOfWindow() Trigger {
 	defaultEarly := TriggerDefault()
 	return Trigger{Kind: AfterEndOfWindowTrigger, EarlyTrigger: &defaultEarly, LateTrigger: nil}
 }
 
-// EarlyFiring configures AfterEndOfWindow trigger with an early firing trigger.
+// EarlyFiring configures an AfterEndOfWindow trigger with an implicitly
+// repeated trigger that applies before the end of the window.
 func (tr Trigger) EarlyFiring(early Trigger) Trigger {
 	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply early firing to %s, got: %s, want: AfterEndOfWindowTrigger", tr.Kind, tr.Kind))
+		panic(fmt.Errorf("can't apply early firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
 	}
 	tr.EarlyTrigger = &early
 	return tr
 }
 
-// LateFiring configures AfterEndOfWindow trigger with a late firing trigger
+// LateFiring configures an AfterEndOfWindow trigger with an implicitly
+// repeated trigger that applies after the end of the window.
+//
+// Not setting a late firing trigger means elements are discarded.
 func (tr Trigger) LateFiring(late Trigger) Trigger {
 	if tr.Kind != AfterEndOfWindowTrigger {
-		panic(fmt.Errorf("can't apply late firing to %s, got: %s, want: AfterEndOfWindowTrigger", tr.Kind, tr.Kind))
+		panic(fmt.Errorf("can't apply late firing to %s, want: AfterEndOfWindowTrigger", tr.Kind))
 	}
 	tr.LateTrigger = &late
 	return tr
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index c74b645..b31a40b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -1042,16 +1042,34 @@ func makeTrigger(t window.Trigger) *pipepb.Trigger {
 			},
 		}
 	case window.AfterProcessingTimeTrigger:
-		// TODO(BEAM-3304) Right now would work only for single delay value.
-		// could be configured to take more than one delay values later.
-		ttd := &pipepb.TimestampTransform{
-			TimestampTransform: &pipepb.TimestampTransform_Delay_{
-				Delay: &pipepb.TimestampTransform_Delay{DelayMillis: t.Delay},
-			}}
-		tt := []*pipepb.TimestampTransform{ttd}
+		if len(t.TimestampTransforms) == 0 {
+			panic("AfterProcessingTime trigger set without a delay or alignment.")
+		}
+		tts := []*pipepb.TimestampTransform{}
+		for _, tt := range t.TimestampTransforms {
+			var ttp *pipepb.TimestampTransform
+			switch tt := tt.(type) {
+			case window.DelayTransform:
+				ttp = &pipepb.TimestampTransform{
+					TimestampTransform: &pipepb.TimestampTransform_Delay_{
+						Delay: &pipepb.TimestampTransform_Delay{DelayMillis: tt.Delay},
+					}}
+			case window.AlignToTransform:
+				ttp = &pipepb.TimestampTransform{
+					TimestampTransform: &pipepb.TimestampTransform_AlignTo_{
+						AlignTo: &pipepb.TimestampTransform_AlignTo{
+							Period: tt.Period,
+							Offset: tt.Offset,
+						},
+					}}
+			}
+			tts = append(tts, ttp)
+		}
 		return &pipepb.Trigger{
 			Trigger: &pipepb.Trigger_AfterProcessingTime_{
-				AfterProcessingTime: &pipepb.Trigger_AfterProcessingTime{TimestampTransforms: tt},
+				AfterProcessingTime: &pipepb.Trigger_AfterProcessingTime{
+					TimestampTransforms: tts,
+				},
 			},
 		}
 	case window.ElementCountTrigger:
diff --git a/sdks/go/pkg/beam/option.go b/sdks/go/pkg/beam/option.go
index 5d8df6f..db8f433 100644
--- a/sdks/go/pkg/beam/option.go
+++ b/sdks/go/pkg/beam/option.go
@@ -54,11 +54,11 @@ func parseOpts(opts []Option) ([]SideInput, []TypeDefinition) {
 	var infer []TypeDefinition
 
 	for _, opt := range opts {
-		switch opt.(type) {
+		switch opt := opt.(type) {
 		case SideInput:
-			side = append(side, opt.(SideInput))
+			side = append(side, opt)
 		case TypeDefinition:
-			infer = append(infer, opt.(TypeDefinition))
+			infer = append(infer, opt)
 		default:
 			panic(fmt.Sprintf("Unexpected opt: %v", opt))
 		}
diff --git a/sdks/go/pkg/beam/windowing.go b/sdks/go/pkg/beam/windowing.go
index 58254ac..90cf299 100644
--- a/sdks/go/pkg/beam/windowing.go
+++ b/sdks/go/pkg/beam/windowing.go
@@ -17,6 +17,7 @@ package beam
 
 import (
 	"fmt"
+	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
@@ -27,31 +28,50 @@ type WindowIntoOption interface {
 	windowIntoOption()
 }
 
-type WindowTrigger struct {
-	Name window.Trigger
+type windowTrigger struct {
+	trigger window.Trigger
 }
 
-func (t WindowTrigger) windowIntoOption() {}
+func (t windowTrigger) windowIntoOption() {}
 
-// Trigger applies `tr` trigger to the window.
-func Trigger(tr window.Trigger) WindowTrigger {
-	return WindowTrigger{Name: tr}
+// Trigger applies the given trigger to the window.
+//
+// Trigger support in the Go SDK is currently experimental
+// and may have breaking changes made to it.
+// Use at your own discretion.
+func Trigger(tr window.Trigger) WindowIntoOption {
+	return windowTrigger{trigger: tr}
 }
 
-type AccumulationMode struct {
-	Mode window.AccumulationMode
+type accumulationMode struct {
+	mode window.AccumulationMode
 }
 
-func (m AccumulationMode) windowIntoOption() {}
+func (m accumulationMode) windowIntoOption() {}
 
 // PanesAccumulate applies an Accumulating AccumulationMode to the window.
-func PanesAccumulate() AccumulationMode {
-	return AccumulationMode{Mode: window.Accumulating}
+// After a pane fires, already processed elements will accumulate and
+// elements will be repeated in subseqent firings for the window.
+func PanesAccumulate() WindowIntoOption {
+	return accumulationMode{mode: window.Accumulating}
 }
 
 // PanesDiscard applies a Discarding AccumulationMode to the window.
-func PanesDiscard() AccumulationMode {
-	return AccumulationMode{Mode: window.Discarding}
+// After a pane fires, already processed elements will be discarded
+// and not included in later firings for the window.
+func PanesDiscard() WindowIntoOption {
+	return accumulationMode{mode: window.Discarding}
+}
+
+type allowedLateness struct {
+	delay time.Duration
+}
+
+func (m allowedLateness) windowIntoOption() {}
+
+// AllowedLateness configures for how long data may arrive after the end of a window.
+func AllowedLateness(delay time.Duration) WindowIntoOption {
+	return allowedLateness{delay: delay}
 }
 
 // WindowInto applies the windowing strategy to each element.
@@ -70,10 +90,15 @@ func TryWindowInto(s Scope, wfn *window.Fn, col PCollection, opts ...WindowIntoO
 	ws := window.WindowingStrategy{Fn: wfn, Trigger: window.Trigger{}}
 	for _, opt := range opts {
 		switch opt := opt.(type) {
-		case WindowTrigger:
-			ws.Trigger = opt.Name
-		case AccumulationMode:
-			ws.AccumulationMode = opt.Mode
+		case windowTrigger:
+			// TODO(BEAM-3304): call validation on trigger construction here
+			// so local errors can be returned to the user in their pipeline
+			// context instead of at pipeline translation time.
+			ws.Trigger = opt.trigger
+		case accumulationMode:
+			ws.AccumulationMode = opt.mode
+		case allowedLateness:
+			ws.AllowedLateness = int(opt.delay / time.Millisecond)
 		default:
 			panic(fmt.Sprintf("Unknown WindowInto option type: %T: %v", opt, opt))
 		}
diff --git a/sdks/go/test/integration/primitives/windowinto.go b/sdks/go/test/integration/primitives/windowinto.go
index 1c2286d..afa1500 100644
--- a/sdks/go/test/integration/primitives/windowinto.go
+++ b/sdks/go/test/integration/primitives/windowinto.go
@@ -93,8 +93,8 @@ func WindowSums_Lifted(s beam.Scope) {
 	WindowSums(s.Scope("Lifted"), stats.SumPerKey)
 }
 
-func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m beam.AccumulationMode, expected ...interface{}) {
-	windowed := beam.WindowInto(s, wfn, in, beam.Trigger(tr), m)
+func validateEquals(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected ...interface{}) {
+	windowed := beam.WindowInto(s, wfn, in, opts...)
 	sums := stats.Sum(s, windowed)
 	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
 	passert.Equals(s, sums, expected...)
@@ -110,7 +110,10 @@ func TriggerDefault(s beam.Scope) {
 
 	col := teststream.Create(s, con)
 	windowSize := 10 * time.Second
-	validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerDefault(), beam.PanesDiscard(), 6.0, 9.0)
+	validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(window.TriggerDefault()),
+		}, 6.0, 9.0)
 }
 
 // TriggerAlways tests the Always trigger, it is expected to receive every input value as the output.
@@ -121,11 +124,16 @@ func TriggerAlways(s beam.Scope) {
 	col := teststream.Create(s, con)
 	windowSize := 10 * time.Second
 
-	validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerAlways(), beam.PanesDiscard(), 1.0, 2.0, 3.0)
+	validateEquals(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(window.TriggerAlways()),
+		}, 1.0, 2.0, 3.0)
 }
 
-func validateCount(s beam.Scope, wfn *window.Fn, in beam.PCollection, tr window.Trigger, m beam.AccumulationMode, expected int) {
-	windowed := beam.WindowInto(s, wfn, in, beam.Trigger(tr), m)
+// validateCount handles cases where we can only be sure of the count of elements
+// and not their ordering.
+func validateCount(s beam.Scope, wfn *window.Fn, in beam.PCollection, opts []beam.WindowIntoOption, expected int) {
+	windowed := beam.WindowInto(s, wfn, in, opts...)
 	sums := stats.Sum(s, windowed)
 	sums = beam.WindowInto(s, window.NewGlobalWindows(), sums)
 	passert.Count(s, sums, "total collections", expected)
@@ -147,7 +155,10 @@ func TriggerElementCount(s beam.Scope) {
 
 	// waits only for two elements to arrive and fires output after that and never fires that.
 	// For the trigger to fire every 2 elements, combine it with Repeat Trigger
-	validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, window.TriggerAfterCount(2), beam.PanesDiscard(), 2)
+	validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(window.TriggerAfterCount(2)),
+		}, 2)
 }
 
 // TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires output panes once 't' processing time has passed
@@ -162,7 +173,10 @@ func TriggerAfterProcessingTime(s beam.Scope) {
 
 	col := teststream.Create(s, con)
 
-	validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col, window.TriggerAfterProcessingTime(5000), beam.PanesDiscard(), 6.0)
+	validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(window.TriggerAfterProcessingTime().PlusDelay(5 * time.Second)),
+		}, 6.0)
 }
 
 // TriggerRepeat tests the repeat trigger. As of now is it is configure to take only one trigger as a subtrigger.
@@ -177,7 +191,10 @@ func TriggerRepeat(s beam.Scope) {
 
 	col := teststream.Create(s, con)
 
-	validateCount(s.Scope("Global"), window.NewGlobalWindows(), col, window.TriggerRepeat(window.TriggerAfterCount(2)), beam.PanesDiscard(), 3)
+	validateCount(s.Scope("Global"), window.NewGlobalWindows(), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(window.TriggerRepeat(window.TriggerAfterCount(2))),
+		}, 3)
 }
 
 // TriggerAfterEndOfWindow tests the AfterEndOfWindow Trigger. With AfterCount(2) as the early firing trigger and AfterCount(1) as late firing trigger.
@@ -189,7 +206,12 @@ func TriggerAfterEndOfWindow(s beam.Scope) {
 
 	col := teststream.Create(s, con)
 	windowSize := 10 * time.Second
-	trigger := window.TriggerAfterEndOfWindow().EarlyFiring(window.TriggerAfterCount(2)).LateFiring(window.TriggerAfterCount(1))
-
-	validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col, trigger, beam.PanesDiscard(), 2)
+	trigger := window.TriggerAfterEndOfWindow().
+		EarlyFiring(window.TriggerAfterCount(2)).
+		LateFiring(window.TriggerAfterCount(1))
+
+	validateCount(s.Scope("Fixed"), window.NewFixedWindows(windowSize), col,
+		[]beam.WindowIntoOption{
+			beam.Trigger(trigger),
+		}, 2)
 }
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 0c4d56f..0286b4b 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1462,7 +1462,8 @@ applying `Combine`:
 
 {{< paragraph class="language-go" >}}
 If your `PCollection` uses any non-global windowing function, the Beam Go SDK
-behaves the same way as with global windowing.
+behaves the same way as with global windowing. Windows that are empty in the input
+  `PCollection` will likewise be empty in the output collection.
 {{< /paragraph >}}
 
 ##### 4.2.4.6. Combining values in a keyed PCollection {#combining-values-in-a-keyed-pcollection}
@@ -1651,7 +1652,7 @@ thought to ensure correctness when there are external side effects.
 
 <span class="language-java language-py">
 
-> **Note:** These requirements apply to subclasses of `DoFn`</span> (a function object
+> **Note:** These requirements apply to subclasses of `DoFn`(a function object
 > used with the [ParDo](#pardo) transform), `CombineFn` (a function object used
 > with the [Combine](#combine) transform), and `WindowFn` (a function object
 > used with the [Window](#windowing) transform).
@@ -1660,7 +1661,7 @@ thought to ensure correctness when there are external side effects.
 
 <span class="language-go">
 
-> **Note:** These requirements apply to `DoFn`s</span> (a function object
+> **Note:** These requirements apply to `DoFn`s (a function object
 > used with the [ParDo](#pardo) transform), `CombineFn`s (a function object used
 > with the [Combine](#combine) transform), and `WindowFn`s (a function object
 > used with the [Window](#windowing) transform).
@@ -4414,7 +4415,7 @@ You can define different kinds of windows to divide the elements of your
 *  Sliding Time Windows
 *  Per-Session Windows
 *  Single Global Window
-*  Calendar-based Windows (not supported by the Beam SDK for Python)
+*  Calendar-based Windows (not supported by the Beam SDK for Python or Go)
 
 You can also define your own `WindowFn` if you have a more complex need.
 
@@ -4519,6 +4520,10 @@ into fixed windows, each 60 seconds in length:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" setting_fixed_windows >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_fixed_windows >}}
+{{< /highlight >}}
+
 #### 8.3.2. Sliding time windows {#using-sliding-time-windows}
 
 The following example code shows how to apply `Window` to divide a `PCollection`
@@ -4535,6 +4540,10 @@ begins every five seconds:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" setting_sliding_windows >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_sliding_windows >}}
+{{< /highlight >}}
+
 #### 8.3.3. Session windows {#using-session-windows}
 
 The following example code shows how to apply `Window` to divide a `PCollection`
@@ -4551,6 +4560,10 @@ least 10 minutes (600 seconds):
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" setting_session_windows >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_session_windows >}}
+{{< /highlight >}}
+
 Note that the sessions are per-key — each key in the collection will have its
 own session groupings depending on the data distribution.
 
@@ -4570,6 +4583,10 @@ a single global window for a `PCollection`:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" setting_global_window >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_global_window >}}
+{{< /highlight >}}
+
 ### 8.4. Watermarks and late data {#watermarks-and-late-data}
 
 In any data processing system, there is a certain amount of lag between the time
@@ -4637,6 +4654,10 @@ the end of a window.
               allowed_lateness=Duration(seconds=2*24*60*60)) # 2 days
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_allowed_lateness >}}
+{{< /highlight >}}
+
 When you set `.withAllowedLateness` on a `PCollection`, that allowed lateness
 propagates forward to any subsequent `PCollection` derived from the first
 `PCollection` you applied allowed lateness to. If you want to change the allowed
@@ -4681,8 +4702,21 @@ with a `DoFn` to attach the timestamps to each element in your `PCollection`.
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" setting_timestamp >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_timestamp >}}
+
+// Use the DoFn with ParDo as normal.
+{{< code_sample "sdks/go/examples/snippets/08windowing.go" setting_timestamp_pipeline >}}
+{{< /highlight >}}
+
 ## 9. Triggers {#triggers}
 
+<span class="language-go">
+
+> **Note:** The Trigger API in the Beam SDK for Go is currently experimental and subject to change.
+
+</span>
+
 When collecting and grouping data into windows, Beam uses **triggers** to
 determine when to emit the aggregated results of each window (referred to as a
 *pane*). If you use Beam's default windowing configuration and [default
@@ -4772,7 +4806,7 @@ firings:
 {{< /highlight >}}
 
 {{< highlight go >}}
-  {{< code_sample "sdks/go/examples/snippets/09triggers.go" after_window_trigger >}}
+{{< code_sample "sdks/go/examples/snippets/09triggers.go" after_window_trigger >}}
 {{< /highlight >}}
 
 #### 9.1.1. Default trigger {#default-trigger}
@@ -4845,9 +4879,10 @@ sets the window's **accumulation mode**.
 
 {{< paragraph class="language-go" >}}
 You set the trigger(s) for a `PCollection` by passing in the `beam.Trigger` parameter
-when you use the `beam.WindowInto` transform. This code sample sets an Always
-trigger for a `PCollection`, which emits results every time an element in that window has been processed. The `beam.AccumulationMode` parameter
-sets the window's **accumulation mode**.
+when you use the `beam.WindowInto` transform. This code sample sets a time-based
+trigger for a `PCollection`, which emits results one minute after the first
+element in that window has been processed.
+ The `beam.AccumulationMode` parameter sets the window's **accumulation mode**.
 {{< /paragraph >}}
 
 {{< highlight java >}}
@@ -4866,7 +4901,7 @@ sets the window's **accumulation mode**.
 {{< /highlight >}}
 
 {{< highlight go >}}
-  {{< code_sample "sdks/go/examples/snippets/09triggers.go" always_trigger >}}
+{{< code_sample "sdks/go/examples/snippets/09triggers.go" setting_a_trigger >}}
 {{< /highlight >}}
 
 #### 9.4.1. Window accumulation modes {#window-accumulation-modes}
@@ -4946,8 +4981,10 @@ your windowing configuration. This gives your trigger the opportunity to react
 to the late data. If allowed lateness is set, the default trigger will emit new
 results immediately whenever late data arrives.
 
-You set the allowed lateness by using `.withAllowedLateness()` when you set your
-windowing function:
+You set the allowed lateness by using <span class="language-java">`.withAllowedLateness()`</span>
+<span class="language-py">`allowed_lateness`</span>
+<span class="language-go">`beam.AllowedLateness()`</span>
+when you set your windowing function:
 
 {{< highlight java >}}
   PCollection<String> pc = ...;
@@ -4967,10 +5004,17 @@ windowing function:
 
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/09triggers.go" setting_allowed_lateness >}}
+{{< /highlight >}}
+
 This allowed lateness propagates to all `PCollection`s derived as a result of
 applying transforms to the original `PCollection`. If you want to change the
 allowed lateness later in your pipeline, you can apply
-`Window.configure().withAllowedLateness()` again, explicitly.
+<span class="language-java">`Window.configure().withAllowedLateness()`</span>
+<span class="language-py">`allowed_lateness`</span>
+<span class="language-go">`beam.AllowedLateness()`</span>
+again, explicitly.
 
 
 ### 9.5. Composite triggers {#composite-triggers}
@@ -5043,6 +5087,10 @@ example trigger code fires on the following conditions:
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" model_composite_triggers >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/09triggers.go" model_composite_triggers >}}
+{{< /highlight >}}
+
 #### 9.5.3. Other composite triggers {#other-composite-triggers}
 
 You can also build other sorts of composite triggers. The following example code