You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2024/02/16 19:49:37 UTC
(beam) branch master updated: [#29917][prism] Initial TestStream support (#30072)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 48adde999b9 [#29917][prism] Initial TestStream support (#30072)
48adde999b9 is described below
commit 48adde999b9212c5bae8a330111fe8739fc1fbde
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Feb 16 11:49:29 2024 -0800
[#29917][prism] Initial TestStream support (#30072)
---
.../prism/internal/engine/elementmanager.go | 99 ++++++--
.../runners/prism/internal/engine/engine_test.go | 47 ++++
.../runners/prism/internal/engine/teststream.go | 269 +++++++++++++++++++++
sdks/go/pkg/beam/runners/prism/internal/execute.go | 54 +++++
.../prism/internal/jobservices/management.go | 18 ++
.../runners/prism/internal/unimplemented_test.go | 43 +++-
sdks/go/pkg/beam/testing/teststream/teststream.go | 8 +-
sdks/go/test/integration/integration.go | 7 +-
sdks/go/test/integration/primitives/teststream.go | 43 +++-
.../test/integration/primitives/teststream_test.go | 10 +
10 files changed, 548 insertions(+), 50 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 077d6386315..28ea75ac9e5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -166,6 +166,8 @@ type ElementManager struct {
livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
+
+ testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
}
func (em *ElementManager) addPending(v int) {
@@ -223,6 +225,15 @@ func (em *ElementManager) StageStateful(ID string) {
em.stages[ID].stateful = true
}
+// AddTestStream provides a builder interface for the execution layer to build the test stream from
+// the protos.
+func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
+ impl := &testStreamImpl{em: em}
+ impl.initHandler(id)
+ impl.TagsToPCollections(tagToPCol)
+ return impl
+}
+
// Impulse marks and initializes the given stage as an impulse which
// is a root transform that starts processing.
func (em *ElementManager) Impulse(stageID string) {
@@ -319,37 +330,72 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
em.refreshCond.L.Lock()
}
}
- if len(em.inprogressBundles) == 0 && len(em.watermarkRefreshes) == 0 {
- v := em.livePending.Load()
- slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
- if v > 0 {
- var stageState []string
- ids := maps.Keys(em.stages)
- sort.Strings(ids)
- for _, id := range ids {
- ss := em.stages[id]
- inW := ss.InputWatermark()
- outW := ss.OutputWatermark()
- upPCol, upW := ss.UpstreamWatermark()
- upS := em.pcolParents[upPCol]
- stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
- }
- panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
- }
- } else if len(em.inprogressBundles) == 0 {
- v := em.livePending.Load()
- slog.Debug("Bundles: nothing in progress after advance",
- slog.Any("advanced", advanced),
- slog.Int("refreshCount", len(em.watermarkRefreshes)),
- slog.Int64("pendingElementCount", v),
- )
- }
- em.refreshCond.L.Unlock()
+ em.checkForQuiescence(advanced)
}
}()
return runStageCh
}
+// checkForQuiescence sees if this element manager is no longer able to do any pending work or make progress.
+//
+// Quiescense can happen if there are no inprogress bundles, and there are no further watermark refreshes, which
+// are the only way to access new pending elements. If there are no pending elements, then the pipeline will
+// terminate successfully.
+//
+// Otherwise, produce information for debugging why the pipeline is stuck and take appropriate action, such as
+// executing off the next TestStream event.
+//
+// Must be called while holding em.refreshCond.L.
+func (em *ElementManager) checkForQuiescence(advanced set[string]) {
+ defer em.refreshCond.L.Unlock()
+ if len(em.inprogressBundles) > 0 {
+ // If there are bundles in progress, then there may be watermark refreshes when they terminate.
+ return
+ }
+ if len(em.watermarkRefreshes) > 0 {
+ // If there are watermarks to refresh, we aren't yet stuck.
+ v := em.livePending.Load()
+ slog.Debug("Bundles: nothing in progress after advance",
+ slog.Any("advanced", advanced),
+ slog.Int("refreshCount", len(em.watermarkRefreshes)),
+ slog.Int64("pendingElementCount", v),
+ )
+ return
+ }
+ // The job has quiesced!
+
+ // There are no further incoming watermark changes, see if there are test stream events for this job.
+ nextEvent := em.testStreamHandler.NextEvent()
+ if nextEvent != nil {
+ nextEvent.Execute(em)
+ // Decrement pending for the event being processed.
+ em.addPending(-1)
+ return
+ }
+
+ v := em.livePending.Load()
+ if v == 0 {
+ // Since there are no further pending elements, the job will be terminating successfully.
+ return
+ }
+ // The job is officially stuck. Fail fast and produce debugging information.
+ // Jobs must never get stuck so this indicates a bug in prism to be investigated.
+
+ slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
+ var stageState []string
+ ids := maps.Keys(em.stages)
+ sort.Strings(ids)
+ for _, id := range ids {
+ ss := em.stages[id]
+ inW := ss.InputWatermark()
+ outW := ss.OutputWatermark()
+ upPCol, upW := ss.UpstreamWatermark()
+ upS := em.pcolParents[upPCol]
+ stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHoldHeap, "holdCounts", ss.watermarkHoldsCounts))
+ }
+ panic(fmt.Sprintf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, "")))
+}
+
// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
// the PCollection's coders.
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte {
@@ -429,6 +475,7 @@ const (
BlockTimer // BlockTimer represents timers for the bundle.
)
+// Block represents a contiguous set of data or timers for the same destination.
type Block struct {
Kind BlockKind
Bytes [][]byte
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
index 6a39b9d2070..04269e3dd6a 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
@@ -169,3 +169,50 @@ func TestElementManagerCoverage(t *testing.T) {
})
}
}
+
+func TestTestStream(t *testing.T) {
+ initRunner(t)
+
+ tests := []struct {
+ pipeline func(s beam.Scope)
+ }{
+ {pipeline: primitives.TestStreamBoolSequence},
+ {pipeline: primitives.TestStreamByteSliceSequence},
+ {pipeline: primitives.TestStreamFloat64Sequence},
+ {pipeline: primitives.TestStreamInt64Sequence},
+ {pipeline: primitives.TestStreamInt16Sequence},
+ {pipeline: primitives.TestStreamStrings},
+ {pipeline: primitives.TestStreamTwoBoolSequences},
+ {pipeline: primitives.TestStreamTwoFloat64Sequences},
+ {pipeline: primitives.TestStreamTwoInt64Sequences},
+ {pipeline: primitives.TestStreamTwoUserTypeSequences},
+ }
+
+ configs := []struct {
+ name string
+ OneElementPerKey, OneKeyPerBundle bool
+ }{
+ {"Greedy", false, false},
+ {"AllElementsPerKey", false, true},
+ {"OneElementPerKey", true, false},
+ {"OneElementPerBundle", true, true},
+ }
+ for _, config := range configs {
+ for _, test := range tests {
+ t.Run(initTestName(test.pipeline)+"_"+config.name, func(t *testing.T) {
+ t.Cleanup(func() {
+ engine.OneElementPerKey = false
+ engine.OneKeyPerBundle = false
+ })
+ engine.OneElementPerKey = config.OneElementPerKey
+ engine.OneKeyPerBundle = config.OneKeyPerBundle
+ p, s := beam.NewPipelineWithRoot()
+ test.pipeline(s)
+ _, err := executeWithT(context.Background(), t, p)
+ if err != nil {
+ t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err)
+ }
+ })
+ }
+ }
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
new file mode 100644
index 00000000000..c0a0ff8ebe7
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package engine
+
+import (
+ "time"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+)
+
+// We define our own element wrapper and similar to avoid depending on the protos within the
+// engine package. This improves compile times, and readability of this package.
+
+// TestStreamHandler manages TestStreamEvents for the ElementManager.
+//
+// TestStreams are a pipeline root like an Impulse. They kick off computation, and
+// strictly manage Watermark advancements.
+//
+// A given pipeline can only have a single TestStream due to test streams
+// requiring a single source of truth for Relative Processing Time advancements
+// and ordering emissions of Elements.
+// All operations with testStreamHandler are expected to be in the element manager's
+// refresh lock critical section.
+type testStreamHandler struct {
+ ID string
+
+ nextEventIndex int
+ events []tsEvent
+ // Initialzed with normal "time.Now", so this does change by relative nature.
+ processingTime time.Time // Override for the processing time clock, for triggers and ProcessContinuations.
+
+ tagState map[string]tagState // Map from event tag to related outputs.
+
+ completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once.
+}
+
+func makeTestStreamHandler(id string) *testStreamHandler {
+ return &testStreamHandler{
+ ID: id,
+ tagState: map[string]tagState{},
+ }
+}
+
+// tagState tracks state for a given tag.
+type tagState struct {
+ watermark mtime.Time // Current Watermark for this tag.
+ pcollection string // ID for the pcollection of this tag to look up consumers.
+}
+
+// Now represents the overridden ProcessingTime, which is only advanced when directed by an event.
+// Overrides the elementManager "clock".
+func (ts *testStreamHandler) Now() time.Time {
+ return ts.processingTime
+}
+
+// TagsToPCollections recieves the map of local output tags to global pcollection ids.
+func (ts *testStreamHandler) TagsToPCollections(tagToPcol map[string]string) {
+ for tag, pcol := range tagToPcol {
+ ts.tagState[tag] = tagState{
+ watermark: mtime.MinTimestamp,
+ pcollection: pcol,
+ }
+ // If there is only one output pcollection, duplicate initial state to the
+ // empty tag string.
+ if len(tagToPcol) == 1 {
+ ts.tagState[""] = ts.tagState[tag]
+ }
+ }
+}
+
+// AddElementEvent adds an element event to the test stream event queue.
+func (ts *testStreamHandler) AddElementEvent(tag string, elements []TestStreamElement) {
+ ts.events = append(ts.events, tsElementEvent{
+ Tag: tag,
+ Elements: elements,
+ })
+}
+
+// AddWatermarkEvent adds a watermark event to the test stream event queue.
+func (ts *testStreamHandler) AddWatermarkEvent(tag string, newWatermark mtime.Time) {
+ ts.events = append(ts.events, tsWatermarkEvent{
+ Tag: tag,
+ NewWatermark: newWatermark,
+ })
+}
+
+// AddProcessingTimeEvent adds a processing time event to the test stream event queue.
+func (ts *testStreamHandler) AddProcessingTimeEvent(d time.Duration) {
+ ts.events = append(ts.events, tsProcessingTimeEvent{
+ AdvanceBy: d,
+ })
+}
+
+// NextEvent returns the next event.
+// If there are no more events, returns nil.
+func (ts *testStreamHandler) NextEvent() tsEvent {
+ if ts == nil {
+ return nil
+ }
+ if ts.nextEventIndex >= len(ts.events) {
+ if !ts.completed {
+ ts.completed = true
+ return tsFinalEvent{stageID: ts.ID}
+ }
+ return nil
+ }
+ ev := ts.events[ts.nextEventIndex]
+ ts.nextEventIndex++
+ return ev
+}
+
+// TestStreamElement wraps the provided bytes and timestamp for ingestion and use.
+type TestStreamElement struct {
+ Encoded []byte
+ EventTime mtime.Time
+}
+
+// tsEvent abstracts over the different TestStream Event kinds so we can keep
+// them in the same queue.
+type tsEvent interface {
+ // Execute the associated event on this element manager.
+ Execute(*ElementManager)
+}
+
+// tsElementEvent implements an element event, inserting additional elements
+// to be pending for consuming stages.
+type tsElementEvent struct {
+ Tag string
+ Elements []TestStreamElement
+}
+
+// Execute this ElementEvent by routing pending element to their consuming stages.
+func (ev tsElementEvent) Execute(em *ElementManager) {
+ t := em.testStreamHandler.tagState[ev.Tag]
+
+ var pending []element
+ for _, e := range ev.Elements {
+ pending = append(pending, element{
+ window: window.GlobalWindow{},
+ timestamp: e.EventTime,
+ elmBytes: e.Encoded,
+ pane: typex.NoFiringPane(),
+ })
+ }
+
+ // Update the consuming state.
+ for _, sID := range em.consumers[t.pcollection] {
+ ss := em.stages[sID]
+ added := ss.AddPending(pending)
+ em.addPending(added)
+ em.watermarkRefreshes.insert(sID)
+ }
+
+ for _, link := range em.sideConsumers[t.pcollection] {
+ ss := em.stages[link.Global]
+ ss.AddPendingSide(pending, link.Transform, link.Local)
+ em.watermarkRefreshes.insert(link.Global)
+ }
+}
+
+// tsWatermarkEvent sets the watermark for the new stage.
+type tsWatermarkEvent struct {
+ Tag string
+ NewWatermark mtime.Time
+}
+
+// Execute this WatermarkEvent by updating the watermark for the tag, and notify affected downstream stages.
+func (ev tsWatermarkEvent) Execute(em *ElementManager) {
+ t := em.testStreamHandler.tagState[ev.Tag]
+
+ if ev.NewWatermark < t.watermark {
+ panic("test stream event decreases watermark. Watermarks cannot go backwards.")
+ }
+ t.watermark = ev.NewWatermark
+ em.testStreamHandler.tagState[ev.Tag] = t
+
+ // Update the upstream watermarks in the consumers.
+ for _, sID := range em.consumers[t.pcollection] {
+ ss := em.stages[sID]
+ ss.updateUpstreamWatermark(ss.inputID, t.watermark)
+ em.watermarkRefreshes.insert(sID)
+ }
+}
+
+// tsProcessingTimeEvent implements advancing the synthetic processing time.
+type tsProcessingTimeEvent struct {
+ AdvanceBy time.Duration
+}
+
+// Execute this ProcessingTime event by advancing the synthetic processing time.
+func (ev tsProcessingTimeEvent) Execute(em *ElementManager) {
+ em.testStreamHandler.processingTime = em.testStreamHandler.processingTime.Add(ev.AdvanceBy)
+}
+
+// tsFinalEvent is the "last" event we perform after all preceeding events.
+// It's automatically inserted once the user defined events have all been executed.
+// It updates the upstream watermarks for all consumers to infinity.
+type tsFinalEvent struct {
+ stageID string
+}
+
+func (ev tsFinalEvent) Execute(em *ElementManager) {
+ em.addPending(1) // We subtrack a pending after event execution, so add one now.
+ ss := em.stages[ev.stageID]
+ kickSet := ss.updateWatermarks(em)
+ kickSet.insert(ev.stageID)
+ em.watermarkRefreshes.merge(kickSet)
+}
+
+// TestStreamBuilder builds a synthetic sequence of events for the engine to execute.
+// A pipeline may only have a single TestStream and may panic.
+type TestStreamBuilder interface {
+ AddElementEvent(tag string, elements []TestStreamElement)
+ AddWatermarkEvent(tag string, newWatermark mtime.Time)
+ AddProcessingTimeEvent(d time.Duration)
+}
+
+type testStreamImpl struct {
+ em *ElementManager
+}
+
+var (
+ _ TestStreamBuilder = (*testStreamImpl)(nil)
+ _ TestStreamBuilder = (*testStreamHandler)(nil)
+)
+
+func (tsi *testStreamImpl) initHandler(id string) {
+ if tsi.em.testStreamHandler == nil {
+ tsi.em.testStreamHandler = makeTestStreamHandler(id)
+ }
+}
+
+// TagsToPCollections recieves the map of local output tags to global pcollection ids.
+func (tsi *testStreamImpl) TagsToPCollections(tagToPcol map[string]string) {
+ tsi.em.testStreamHandler.TagsToPCollections(tagToPcol)
+}
+
+// AddElementEvent adds an element event to the test stream event queue.
+func (tsi *testStreamImpl) AddElementEvent(tag string, elements []TestStreamElement) {
+ tsi.em.testStreamHandler.AddElementEvent(tag, elements)
+ tsi.em.addPending(1)
+}
+
+// AddWatermarkEvent adds a watermark event to the test stream event queue.
+func (tsi *testStreamImpl) AddWatermarkEvent(tag string, newWatermark mtime.Time) {
+ tsi.em.testStreamHandler.AddWatermarkEvent(tag, newWatermark)
+ tsi.em.addPending(1)
+}
+
+// AddProcessingTimeEvent adds a processing time event to the test stream event queue.
+func (tsi *testStreamImpl) AddProcessingTimeEvent(d time.Duration) {
+ tsi.em.testStreamHandler.AddProcessingTimeEvent(d)
+ tsi.em.addPending(1)
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 1aa95bc6ee1..504125a2bd6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -16,6 +16,7 @@
package internal
import (
+ "bytes"
"context"
"errors"
"fmt"
@@ -24,6 +25,7 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -238,6 +240,58 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
case urns.TransformImpulse:
impulses = append(impulses, stage.ID)
em.AddStage(stage.ID, nil, []string{getOnlyValue(t.GetOutputs())}, nil)
+ case urns.TransformTestStream:
+ // Add a synthetic stage that should largely be unused.
+ em.AddStage(stage.ID, nil, maps.Values(t.GetOutputs()), nil)
+ // Decode the test stream, and convert it to the various events for the ElementManager.
+ var pyld pipepb.TestStreamPayload
+ if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pyld); err != nil {
+ return fmt.Errorf("prism error building stage %v - decoding TestStreamPayload: \n%w", stage.ID, err)
+ }
+
+ // Ensure awareness of the coder used for the teststream.
+ cID, err := lpUnknownCoders(pyld.GetCoderId(), coders, comps.GetCoders())
+ if err != nil {
+ panic(err)
+ }
+ mayLP := func(v []byte) []byte {
+ return v
+ }
+ if cID != pyld.GetCoderId() {
+ // The coder needed length prefixing. For simplicity, add a length prefix to each
+ // encoded element, since we will be sending a length prefixed coder to consume
+ // this anyway. This is simpler than trying to find all the re-written coders after the fact.
+ mayLP = func(v []byte) []byte {
+ var buf bytes.Buffer
+ if err := coder.EncodeVarInt((int64)(len(v)), &buf); err != nil {
+ panic(err)
+ }
+ if _, err := buf.Write(v); err != nil {
+ panic(err)
+ }
+ return buf.Bytes()
+ }
+ }
+
+ tsb := em.AddTestStream(stage.ID, t.Outputs)
+ for _, e := range pyld.GetEvents() {
+ switch ev := e.GetEvent().(type) {
+ case *pipepb.TestStreamPayload_Event_ElementEvent:
+ var elms []engine.TestStreamElement
+ for _, e := range ev.ElementEvent.GetElements() {
+ elms = append(elms, engine.TestStreamElement{Encoded: mayLP(e.GetEncodedElement()), EventTime: mtime.Time(e.GetTimestamp())})
+ }
+ tsb.AddElementEvent(ev.ElementEvent.GetTag(), elms)
+ ev.ElementEvent.GetTag()
+ case *pipepb.TestStreamPayload_Event_WatermarkEvent:
+ tsb.AddWatermarkEvent(ev.WatermarkEvent.GetTag(), mtime.Time(ev.WatermarkEvent.GetNewWatermark()))
+ case *pipepb.TestStreamPayload_Event_ProcessingTimeEvent:
+ tsb.AddProcessingTimeEvent(time.Duration(ev.ProcessingTimeEvent.GetAdvanceDuration()) * time.Millisecond)
+ default:
+ return fmt.Errorf("prism error building stage %v - unknown TestStream event type: %T", stage.ID, ev)
+ }
+ }
+
case urns.TransformFlatten:
inputs := maps.Values(t.GetInputs())
sort.Strings(inputs)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 1c7e280dcdd..4cff2ae92e7 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -117,6 +117,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
// Inspect Transforms for unsupported features.
bypassedWindowingStrategies := map[string]bool{}
ts := job.Pipeline.GetComponents().GetTransforms()
+ var testStreamIds []string
for tid, t := range ts {
urn := t.GetSpec().GetUrn()
switch urn {
@@ -170,10 +171,27 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (*jo
continue
}
fallthrough
+ case urns.TransformTestStream:
+ var testStream pipepb.TestStreamPayload
+ if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
+ return nil, fmt.Errorf("unable to unmarshal TestStreamPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
+ }
+ for _, ev := range testStream.GetEvents() {
+ if ev.GetProcessingTimeEvent() != nil {
+ check("TestStream.Event - ProcessingTimeEvents unsupported.", ev.GetProcessingTimeEvent())
+ }
+ }
+
+ t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
+ testStreamIds = append(testStreamIds, tid)
default:
check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
}
}
+ // At most one test stream per pipeline.
+ if len(testStreamIds) > 1 {
+ check("Multiple TestStream Transforms in Pipeline", testStreamIds)
+ }
// Inspect Windowing strategies for unsupported features.
for wsID, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
index a50a7fe21b0..7be5f340dde 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go
@@ -43,18 +43,6 @@ func TestUnimplemented(t *testing.T) {
}{
// {pipeline: primitives.Drain}, // Can't test drain automatically yet.
- {pipeline: primitives.TestStreamBoolSequence},
- {pipeline: primitives.TestStreamByteSliceSequence},
- {pipeline: primitives.TestStreamFloat64Sequence},
- {pipeline: primitives.TestStreamInt64Sequence},
- {pipeline: primitives.TestStreamStrings},
- {pipeline: primitives.TestStreamTwoBoolSequences},
- {pipeline: primitives.TestStreamTwoFloat64Sequences},
- {pipeline: primitives.TestStreamTwoInt64Sequences},
-
- // Needs teststream
- {pipeline: primitives.Panes},
-
// Triggers (Need teststream and are unimplemented.)
{pipeline: primitives.TriggerAlways},
{pipeline: primitives.TriggerAfterAll},
@@ -68,7 +56,8 @@ func TestUnimplemented(t *testing.T) {
{pipeline: primitives.TriggerOrFinally},
{pipeline: primitives.TriggerRepeat},
- // TODO: Timers integration tests.
+ // Needs triggers.
+ {pipeline: primitives.Panes},
}
for _, test := range tests {
@@ -163,3 +152,31 @@ func TestTimers(t *testing.T) {
})
}
}
+
+func TestTestStream(t *testing.T) {
+ initRunner(t)
+
+ tests := []struct {
+ pipeline func(s beam.Scope)
+ }{
+ {pipeline: primitives.TestStreamBoolSequence},
+ {pipeline: primitives.TestStreamByteSliceSequence},
+ {pipeline: primitives.TestStreamFloat64Sequence},
+ {pipeline: primitives.TestStreamInt64Sequence},
+ {pipeline: primitives.TestStreamStrings},
+ {pipeline: primitives.TestStreamTwoBoolSequences},
+ {pipeline: primitives.TestStreamTwoFloat64Sequences},
+ {pipeline: primitives.TestStreamTwoInt64Sequences},
+ }
+
+ for _, test := range tests {
+ t.Run(initTestName(test.pipeline), func(t *testing.T) {
+ p, s := beam.NewPipelineWithRoot()
+ test.pipeline(s)
+ _, err := executeWithT(context.Background(), t, p)
+ if err != nil {
+ t.Fatalf("pipeline failed, but feature should be implemented in Prism: %v", err)
+ }
+ })
+ }
+}
diff --git a/sdks/go/pkg/beam/testing/teststream/teststream.go b/sdks/go/pkg/beam/testing/teststream/teststream.go
index 050e57bf04c..c13e2cee9e0 100644
--- a/sdks/go/pkg/beam/testing/teststream/teststream.go
+++ b/sdks/go/pkg/beam/testing/teststream/teststream.go
@@ -18,11 +18,9 @@
//
// See https://beam.apache.org/blog/test-stream/ for more information.
//
-// TestStream is supported on the Flink runner and currently supports int64,
-// float64, and boolean types.
-//
-// TODO(BEAM-12753): Flink currently displays unexpected behavior with TestStream,
-// should not be used until this issue is resolved.
+// TestStream is supported on the Flink, and Prism runners.
+// Use on Flink currently supports int64, float64, and boolean types, while
+// Prism supports arbitrary types.
package teststream
import (
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index 622689c40d0..8f90ffda9e8 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -139,8 +139,6 @@ var portableFilters = []string{
var prismFilters = []string{
// The prism runner does not yet support Java's CoGBK.
"TestXLang_CoGroupBy",
- // The prism runner does not support the TestStream primitive
- "TestTestStream.*",
// The trigger and pane tests uses TestStream
"TestTrigger.*",
"TestPanes",
@@ -183,6 +181,11 @@ var flinkFilters = []string{
"TestSetStateClear",
"TestSetState",
+ // With TestStream Flink adds extra length prefixs some data types, causing SDK side failures.
+ "TestTestStreamStrings",
+ "TestTestStreamByteSliceSequence",
+ "TestTestStreamTwoUserTypeSequences",
+
"TestTimers_EventTime_Unbounded", // (failure when comparing on side inputs (NPE on window lookup))
}
diff --git a/sdks/go/test/integration/primitives/teststream.go b/sdks/go/test/integration/primitives/teststream.go
index d30ec9fe11b..c8ba9b565c0 100644
--- a/sdks/go/test/integration/primitives/teststream.go
+++ b/sdks/go/test/integration/primitives/teststream.go
@@ -31,18 +31,22 @@ func TestStreamStrings(s beam.Scope) {
col := teststream.Create(s, con)
passert.Count(s, col, "teststream strings", 3)
+ passert.Equals(s, col, "a", "b", "c")
}
// TestStreamByteSliceSequence tests the TestStream primitive by inserting byte slice elements
// then advancing the watermark to infinity and comparing the output..
func TestStreamByteSliceSequence(s beam.Scope) {
con := teststream.NewConfig()
- b := []byte{91, 92, 93}
- con.AddElements(1, b)
+
+ a := []byte{91, 92, 93}
+ b := []byte{94, 95, 96}
+ c := []byte{97, 98, 99}
+ con.AddElements(1, a, b, c)
con.AdvanceWatermarkToInfinity()
col := teststream.Create(s, con)
- passert.Count(s, col, "teststream byte", 1)
- passert.Equals(s, col, append([]byte{3}, b...))
+ passert.Count(s, col, "teststream byte", 3)
+ passert.Equals(s, col, a, b, c)
}
// TestStreamInt64Sequence tests the TestStream primitive by inserting int64 elements
@@ -137,3 +141,34 @@ func TestStreamTwoBoolSequences(s beam.Scope) {
passert.Count(s, col, "teststream bool", 6)
passert.EqualsList(s, col, append(eo, et...))
}
+
+// TestStreamTwoUserTypeSequences tests the TestStream primitive by inserting two sets of
+// boolean elements that arrive on-time into the TestStream
+func TestStreamTwoUserTypeSequences(s beam.Scope) {
+ con := teststream.NewConfig()
+ eo := []stringPair{{"a", "b"}, {"b", "c"}, {"c", "a"}}
+ et := []stringPair{{"b", "a"}, {"c", "b"}, {"a", "c"}}
+ con.AddElementList(100, eo)
+ con.AdvanceWatermark(110)
+ con.AddElementList(120, et)
+ con.AdvanceWatermark(130)
+
+ col := teststream.Create(s, con)
+
+ passert.Count(s, col, "teststream usertype", 6)
+ passert.EqualsList(s, col, append(eo, et...))
+}
+
+// TestStreamInt16Sequence validates that a non-beam standard coder
+// works with test stream.
+func TestStreamInt16Sequence(s beam.Scope) {
+ con := teststream.NewConfig()
+ ele := []int16{91, 92, 93}
+ con.AddElementList(100, ele)
+ con.AdvanceWatermarkToInfinity()
+
+ col := teststream.Create(s, con)
+
+ passert.Count(s, col, "teststream int15", 3)
+ passert.EqualsList(s, col, ele)
+}
diff --git a/sdks/go/test/integration/primitives/teststream_test.go b/sdks/go/test/integration/primitives/teststream_test.go
index 90a2120294e..b0144f148cb 100644
--- a/sdks/go/test/integration/primitives/teststream_test.go
+++ b/sdks/go/test/integration/primitives/teststream_test.go
@@ -37,6 +37,11 @@ func TestTestStreamInt64Sequence(t *testing.T) {
ptest.BuildAndRun(t, TestStreamInt64Sequence)
}
+func TestTestStreamInt16Sequence(t *testing.T) {
+ integration.CheckFilters(t)
+ ptest.BuildAndRun(t, TestStreamInt16Sequence)
+}
+
func TestTestStreamTwoInt64Sequences(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTwoInt64Sequences)
@@ -61,3 +66,8 @@ func TestTestStreamTwoBoolSequences(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTwoBoolSequences)
}
+
+func TestTestStreamTwoUserTypeSequences(t *testing.T) {
+ integration.CheckFilters(t)
+ ptest.BuildAndRun(t, TestStreamTwoUserTypeSequences)
+}