You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/02/09 19:56:09 UTC

[GitHub] [beam] lostluck opened a new pull request, #25406: [prism] Add internal/config package

lostluck opened a new pull request, #25406:
URL: https://github.com/apache/beam/pull/25406

   Add internal/config package, which is the core for configuring the Runner with variants and handlers.
   
   Uber PR for all changes: https://github.com/apache/beam/pull/25391
   
   See https://github.com/apache/beam/issues/24789
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424823542

   R: @jrmccluskey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25406:
URL: https://github.com/apache/beam/pull/25406#discussion_r1102059076


##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424931601

   FYI @damondouglas this PR is failing because wazero made a breaking change to their API. I don't have time to do the quick fix at the moment. Otherwise I'm inclined to decline this dependabot update until the next version.  The foibles of depending on unstable APIs of pre v1 packages.
   
   
   ```
   examples/wasm/wasm.go:118:20: too many arguments in call to fn.r.NewHostModuleBuilder("env").NewFunctionBuilder().WithFunc(logString).Export("log").Instantiate
    	have (context.Context, wazero.Runtime)
    	want (context.Context)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25406:
URL: https://github.com/apache/beam/pull/25406#discussion_r1102052303


##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {

Review Comment:
   A typo. Whoops. Thanks. This isn't yet used, so it wasn't caught. Soon!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #25406: [prism] Add internal/config package

Posted by "jrmccluskey (via GitHub)" <gi...@apache.org>.
jrmccluskey commented on code in PR #25406:
URL: https://github.com/apache/beam/pull/25406#discussion_r1102042796


##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {

Review Comment:
   Is there a naming convention reason why this is `NewHandlerwRegistry()` instead of `NewHandlerRegistry()`?



##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}

Review Comment:
   Nit: could we move these to be contiguous with the rest of the HandlerRegistry fns? 



##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}
+
+type Variant struct {

Review Comment:
   Nit: I don't love the naming collision of the internal, little-v `variant` and the exported `Variant`, it doesn't read particularly clearly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #25406:
URL: https://github.com/apache/beam/pull/25406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424884495

   PTAL. Reordered the types & methods in top down order where definitions are involved. This puts HandlerRegistry at the bottom.
   
   Added missing exported type comments, as well as the renames.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424885293

   PS. These are exactly the kind of comments I'm looking for at this stage of Prism.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424973648

   https://ci-beam.apache.org/job/beam_PreCommit_Go_Phrase/226/console
   
   This was the run for this PR after the most recent one. Merging.
   
   Thanks for the quick reviews! Now it gets *fun*.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25406: [prism] Add internal/config package

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1425026104

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25406:
URL: https://github.com/apache/beam/pull/25406#discussion_r1102053229


##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}
+
+type Variant struct {

Review Comment:
   Renamed `variants` to configFile, since it's logically the whole file.
   Renamed `variant` to `rawVariant`, since it's the raw struct being decoded into by YAML.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424886470

   Run Go Precommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424852735

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424822499

   Looks like this depends on something not in the go.mod file yet.  So while this is good to review, it shouldn't be merged until we get that green after https://github.com/apache/beam/pull/25404 is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424852537

   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424894505

   Thanks! Gonna wait until the automated jobs pick up. It seems there's a bit of a Queue at present.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25406: [prism] Add internal/config package

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25406:
URL: https://github.com/apache/beam/pull/25406#issuecomment-1424888608

   Run Go PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org