You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/30 18:00:20 UTC

[GitHub] [beam] lostluck commented on a change in pull request #15253: [BEAM-11088] Add TestStream package to Go SDK testing capabilities

lostluck commented on a change in pull request #15253:
URL: https://github.com/apache/beam/pull/15253#discussion_r680110178



##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}

Review comment:
       Correct! That's what's expected for now. Add a comment to the "teststream.Create" method to call out this current limitation that the test stream must be the first PTransform in a pipeline.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {

Review comment:
       Since the proto payload is an implementation detail, this method doesn't need to be exported and can remain internal to the package.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is
+// in milliseconds
+func (c *Config) AdvanceWatermark(timestamp int64) error {
+	if c.Watermark >= timestamp {
+		return fmt.Errorf("watermark must be monotonally increasing, is at %v, got %v", c.Watermark, timestamp)
+	}
+	watermarkAdvance := &pipepb.TestStreamPayload_Event_AdvanceWatermark{NewWatermark: timestamp}
+	watermarkEvent := &pipepb.TestStreamPayload_Event_WatermarkEvent{WatermarkEvent: watermarkAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: watermarkEvent})
+	c.Watermark = timestamp
+	return nil
+}
+
+// AdvanceWatermarkToInfinity advances the watermark for the PCollection corresponding to the tag
+// to the maximum timestamp.
+func (c *Config) AdvanceWatermarkToInfinity() error {
+	return c.AdvanceWatermark(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AdvanceProcessingTime adds an event into the Config Events struct advancing the processing time by a given
+// duration. This advancement is applied to all of the PCollections output by the TestStream.
+func (c *Config) AdvanceProcessingTime(duration int64) {
+	processingAdvance := &pipepb.TestStreamPayload_Event_AdvanceProcessingTime{AdvanceDuration: duration}
+	processingEvent := &pipepb.TestStreamPayload_Event_ProcessingTimeEvent{ProcessingTimeEvent: processingAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: processingEvent})
+}
+
+// AdvanceProcessingTimeToInfinity moves the TestStream processing time to the largest possible
+// timestamp.
+func (c *Config) AdvanceProcessingTimeToInfinity() {
+	c.AdvanceProcessingTime(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AddElements adds a number of elements to the Config object at the specified timestamp.
+// The encoder will panic if there is a type mismatch between the provided coder and the
+// elements.
+func (c *Config) AddElements(timestamp int64, elements ...interface{}) error {
+	newElements := []*pipepb.TestStreamPayload_TimestampedElement{}
+	enc := beam.NewElementEncoder(c.ElmCoder.T.Type())
+	for _, e := range elements {
+		var buf bytes.Buffer
+		if err := enc.Encode(e, &buf); err != nil {
+			return fmt.Errorf("encoding value %v failed, got %v", e, err)
+		}
+		newElements = append(newElements, &pipepb.TestStreamPayload_TimestampedElement{EncodedElement: buf.Bytes(), Timestamp: timestamp})
+	}
+	addElementsEvent := &pipepb.TestStreamPayload_Event_AddElements{Elements: newElements}
+	elementEvent := &pipepb.TestStreamPayload_Event_ElementEvent{ElementEvent: addElementsEvent}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: elementEvent})
+	return nil
+}
+
+// TestStream inserts a TestStream primitive into a pipeline, taking a scope and a Config object and
+// producing an array of output PCollections.
+func TestStream(s beam.Scope, c Config) []beam.PCollection {
+	pyld := protox.MustEncode(c.CreatePayload())
+	outputs := []beam.FullType{c.ElmCoder.T}
+
+	outputMap := beam.External(s, urn, pyld, []beam.PCollection{}, outputs, false)
+
+	var ret []beam.PCollection
+	for _, val := range outputMap {
+		ret = append(ret, val)
+	}
+	return ret

Review comment:
       Agreed. Since implementations seem to all be single output collections, lets punt on tagged multi-collection map support for now. A later iteration can add a `CreateN` or `CreateMulti`call which would return a map.

##########
File path: sdks/go/test/integration/primitives/teststream.go
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package primitives
+
+import (
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
+	"github.com/apache/beam/sdks/go/pkg/beam/testing/teststream"
+)
+
+// TestStreamSequence tests the TestStream primitive by inserting string elements
+// then advancing the watermark past the poin where they were inserted.

Review comment:
       ```suggestion
   // then advancing the watermark past the point where they were inserted.
   ```

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {

Review comment:
       Conventionally if a constructor function is returning a pointer, it's totally OK to call it "New<foo>" instead of Make. Shorter, and idiomatic to a certain degree.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is
+// in milliseconds
+func (c *Config) AdvanceWatermark(timestamp int64) error {
+	if c.Watermark >= timestamp {
+		return fmt.Errorf("watermark must be monotonally increasing, is at %v, got %v", c.Watermark, timestamp)
+	}
+	watermarkAdvance := &pipepb.TestStreamPayload_Event_AdvanceWatermark{NewWatermark: timestamp}
+	watermarkEvent := &pipepb.TestStreamPayload_Event_WatermarkEvent{WatermarkEvent: watermarkAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: watermarkEvent})
+	c.Watermark = timestamp
+	return nil
+}
+
+// AdvanceWatermarkToInfinity advances the watermark for the PCollection corresponding to the tag
+// to the maximum timestamp.
+func (c *Config) AdvanceWatermarkToInfinity() error {
+	return c.AdvanceWatermark(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AdvanceProcessingTime adds an event into the Config Events struct advancing the processing time by a given

Review comment:
       We can omit implementation details like "Config Events struct " in the comment, instead leaning on the abstraction. TestStream is specifying a sequence of events, so the comments can be phrased accordingly. 

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is
+// in milliseconds
+func (c *Config) AdvanceWatermark(timestamp int64) error {
+	if c.Watermark >= timestamp {
+		return fmt.Errorf("watermark must be monotonally increasing, is at %v, got %v", c.Watermark, timestamp)
+	}
+	watermarkAdvance := &pipepb.TestStreamPayload_Event_AdvanceWatermark{NewWatermark: timestamp}
+	watermarkEvent := &pipepb.TestStreamPayload_Event_WatermarkEvent{WatermarkEvent: watermarkAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: watermarkEvent})
+	c.Watermark = timestamp
+	return nil
+}
+
+// AdvanceWatermarkToInfinity advances the watermark for the PCollection corresponding to the tag
+// to the maximum timestamp.
+func (c *Config) AdvanceWatermarkToInfinity() error {
+	return c.AdvanceWatermark(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AdvanceProcessingTime adds an event into the Config Events struct advancing the processing time by a given
+// duration. This advancement is applied to all of the PCollections output by the TestStream.
+func (c *Config) AdvanceProcessingTime(duration int64) {
+	processingAdvance := &pipepb.TestStreamPayload_Event_AdvanceProcessingTime{AdvanceDuration: duration}
+	processingEvent := &pipepb.TestStreamPayload_Event_ProcessingTimeEvent{ProcessingTimeEvent: processingAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: processingEvent})
+}
+
+// AdvanceProcessingTimeToInfinity moves the TestStream processing time to the largest possible
+// timestamp.
+func (c *Config) AdvanceProcessingTimeToInfinity() {
+	c.AdvanceProcessingTime(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AddElements adds a number of elements to the Config object at the specified timestamp.

Review comment:
       It would be good to specify that the timestamp is the "event timestamp" for the element, and not when the element is being handled in processing time. This ties the comment to the Beam model.
   
   The panic comment about mismatching is good.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.

Review comment:
       Add a link to the blogpost describing test stream to the Package doc.
   
   We can clarify that teststream is only supported on Flink.
   
   It looks like it should be supported on Dataflow, so that's worth trying and calling out. It's only weird to mention because Dataflow costs money vs the intent of this is for testing... 

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {

Review comment:
       In the "auto define coder" world, this is where a user would be required to call both SetCoder and SetEndpoint, but it would also be correct to have SetEndpoint also take the coder in question, as that's when it's relevant.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is

Review comment:
       Remove the tag comment since we're not worrying about multiple PCollections now.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is
+// in milliseconds
+func (c *Config) AdvanceWatermark(timestamp int64) error {
+	if c.Watermark >= timestamp {
+		return fmt.Errorf("watermark must be monotonally increasing, is at %v, got %v", c.Watermark, timestamp)
+	}
+	watermarkAdvance := &pipepb.TestStreamPayload_Event_AdvanceWatermark{NewWatermark: timestamp}
+	watermarkEvent := &pipepb.TestStreamPayload_Event_WatermarkEvent{WatermarkEvent: watermarkAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: watermarkEvent})
+	c.Watermark = timestamp
+	return nil
+}
+
+// AdvanceWatermarkToInfinity advances the watermark for the PCollection corresponding to the tag

Review comment:
       Remove the tag part of the comment.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64

Review comment:
       None of these fields need to be exported, as the Config type isn't getting serialized, and the users should be configuring them with the methods. This may not be java, but we can do some encapsulation and hiding implementation details from users.

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {
+	return Config{ElmCoder: c,
+		Events:    []*pipepb.TestStreamPayload_Event{},
+		Endpoint:  &pipepb.ApiServiceDescriptor{},
+		Watermark: 0,
+	}
+}
+
+// SetEndpoint sets a URL for a TestStreamService that will emit events instead of having them
+// defined manually. Currently does not support authentication, so the TestStreamService should
+// be accessed in a trusted context.
+func (c *Config) SetEndpoint(url string) {
+	c.Endpoint.Url = url
+}
+
+// CreatePayload converts the Config object into a TestStreamPayload to be sent to the runner.
+func (c *Config) CreatePayload() *pipepb.TestStreamPayload {
+	return &pipepb.TestStreamPayload{CoderId: "c0", Events: c.Events, Endpoint: c.Endpoint}
+}
+
+// AdvanceWatermark adds an event to the Config Events struct advancing the watermark for a PCollection
+// to the given timestamp. if the tag is empty, this is applied to the default PCollection. Timestamp is
+// in milliseconds
+func (c *Config) AdvanceWatermark(timestamp int64) error {
+	if c.Watermark >= timestamp {
+		return fmt.Errorf("watermark must be monotonally increasing, is at %v, got %v", c.Watermark, timestamp)
+	}
+	watermarkAdvance := &pipepb.TestStreamPayload_Event_AdvanceWatermark{NewWatermark: timestamp}
+	watermarkEvent := &pipepb.TestStreamPayload_Event_WatermarkEvent{WatermarkEvent: watermarkAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: watermarkEvent})
+	c.Watermark = timestamp
+	return nil
+}
+
+// AdvanceWatermarkToInfinity advances the watermark for the PCollection corresponding to the tag
+// to the maximum timestamp.
+func (c *Config) AdvanceWatermarkToInfinity() error {
+	return c.AdvanceWatermark(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AdvanceProcessingTime adds an event into the Config Events struct advancing the processing time by a given
+// duration. This advancement is applied to all of the PCollections output by the TestStream.
+func (c *Config) AdvanceProcessingTime(duration int64) {
+	processingAdvance := &pipepb.TestStreamPayload_Event_AdvanceProcessingTime{AdvanceDuration: duration}
+	processingEvent := &pipepb.TestStreamPayload_Event_ProcessingTimeEvent{ProcessingTimeEvent: processingAdvance}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: processingEvent})
+}
+
+// AdvanceProcessingTimeToInfinity moves the TestStream processing time to the largest possible
+// timestamp.
+func (c *Config) AdvanceProcessingTimeToInfinity() {
+	c.AdvanceProcessingTime(mtime.MaxTimestamp.Milliseconds())
+}
+
+// AddElements adds a number of elements to the Config object at the specified timestamp.
+// The encoder will panic if there is a type mismatch between the provided coder and the
+// elements.
+func (c *Config) AddElements(timestamp int64, elements ...interface{}) error {
+	newElements := []*pipepb.TestStreamPayload_TimestampedElement{}
+	enc := beam.NewElementEncoder(c.ElmCoder.T.Type())
+	for _, e := range elements {
+		var buf bytes.Buffer
+		if err := enc.Encode(e, &buf); err != nil {
+			return fmt.Errorf("encoding value %v failed, got %v", e, err)
+		}
+		newElements = append(newElements, &pipepb.TestStreamPayload_TimestampedElement{EncodedElement: buf.Bytes(), Timestamp: timestamp})
+	}
+	addElementsEvent := &pipepb.TestStreamPayload_Event_AddElements{Elements: newElements}
+	elementEvent := &pipepb.TestStreamPayload_Event_ElementEvent{ElementEvent: addElementsEvent}
+	c.Events = append(c.Events, &pipepb.TestStreamPayload_Event{Event: elementEvent})
+	return nil
+}
+
+// TestStream inserts a TestStream primitive into a pipeline, taking a scope and a Config object and
+// producing an array of output PCollections.
+func TestStream(s beam.Scope, c Config) []beam.PCollection {

Review comment:
       The only beef I have is calling this `TestStream` is that users would invoke it as `teststream.TestStream` which is stuttery and doesn't really describe what's going on.
   
   Consider instead `Create` which then becomes a call like `input := teststream.Create(s, cfg)`  One might thinkg that `New` could work like `teststream.New(s, cfg)` but that usually implies that one is getting some kind of "TestStream" pointer typed value, but in this case it's a PCollection. So I'm leaning torward `Create`.
   

##########
File path: sdks/go/pkg/beam/testing/teststream/teststream.go
##########
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package teststream contains code configuring the TestStream primitive for
+// use in testing code that is meant to be run on streaming data sources.
+// TestStream is not supported on the Go direct runner.
+package teststream
+
+import (
+	"bytes"
+	"fmt"
+
+	"github.com/apache/beam/sdks/go/pkg/beam"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+)
+
+const urn = "beam:transform:teststream:v1"
+
+// Config holds information used to create a TestStreamPayload object.
+type Config struct {
+	ElmCoder  *coder.Coder
+	Events    []*pipepb.TestStreamPayload_Event
+	Endpoint  *pipepb.ApiServiceDescriptor
+	Watermark int64
+}
+
+// MakeConfig initializes a Config struct to begin inserting TestStream events/endpoints into.
+// Requires that users provide the coder for the elements they are trying to emit.
+func MakeConfig(c *coder.Coder) Config {

Review comment:
       At present the Go SDK has avoided users needing to interact with the coder objects. We can have the coder set by the first call to AddElements, and check that subsequent elements are of the same type at future calls.
   
   It would be reasonable to have a "SetCoder" method, which then validates that the specified coder uses the same type.
   
   This does mean that converting values to the event protos would need to be delayed until later, but overall it is more convenient to users that they don't need to deal with it if they don't need to.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org