You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "jrmccluskey (via GitHub)" <gi...@apache.org> on 2023/02/09 21:33:46 UTC

[GitHub] [beam] jrmccluskey commented on a diff in pull request #25406: [prism] Add internal/config package

jrmccluskey commented on code in PR #25406:
URL: https://github.com/apache/beam/pull/25406#discussion_r1102042796


##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {

Review Comment:
   Is there a naming convention reason why this is `NewHandlerwRegistry()` instead of `NewHandlerRegistry()`?



##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}

Review Comment:
   Nit: could we move these to be contiguous with the rest of the HandlerRegistry fns? 



##########
sdks/go/pkg/beam/runners/prism/internal/config/config.go:
##########
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package config defines and handles the parsing and provision of configurations
+// for the runner. This package should be refered to, and should not take dependencies
+// on other parts of this runner.
+//
+// 1. A given configuation file has one or more variations configured.
+// 2. Each variation has a name, and one or more handlers configured.
+// 3. Each handler maps to a specific struct.
+//
+//	 <variation1 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+//	 <variation2 name>:
+//		  <handler1 name>:
+//		    <handler1 characteristics>
+//		  <handler2 name>:
+//		    <handler2 characteristics>
+//
+// Handler has it's own name, and an associated characterisitc type.
+package config
+
+import (
+	"bytes"
+	"fmt"
+	"reflect"
+	"sort"
+	"strings"
+
+	"golang.org/x/exp/maps"
+	"gopkg.in/yaml.v3"
+)
+
+// variants is the struct configs are decoded into.
+type variants struct {
+	Version      int
+	HandlerOrder []string
+	Default      string              // reserved for laer
+	Variants     map[string]*variant `yaml:",inline"`
+}
+
+// variant holds an individual variant's handlers, and any common fields.
+type variant struct {
+	HandlerOrder []string
+	Handlers     map[string]yaml.Node `yaml:",inline"`
+}
+
+type HandlerRegistry struct {
+	variations map[string]*variant
+	metadata   map[string]HandlerMetadata
+
+	// cached names
+	variantIDs, handerIDs []string
+}
+
+func NewHandlerwRegistry() *HandlerRegistry {
+	return &HandlerRegistry{
+		variations: map[string]*variant{},
+		metadata:   map[string]HandlerMetadata{},
+	}
+}
+
+// HandlerMetadata is required information about handler configurations.
+// Handlers have an URN, which key for how configurations refer to them,
+// and a Characteristic type, which is it's own individual configuration.
+//
+// Characteristic types must have useful zero values, representing the
+// default configuration for the handler.
+type HandlerMetadata interface {
+	// ConfigURN represents the urn for the handle.
+	ConfigURN() string
+
+	// ConfigCharacteristic returns the type of the detailed configuration for the handler.
+	// A characteristic type must have a useful zero value that defines the default behavior.
+	ConfigCharacteristic() reflect.Type
+}
+
+// RegisterHandlers is about registering the metadata for handler configurations.
+func (r *HandlerRegistry) RegisterHandlers(mds ...HandlerMetadata) {
+	for _, md := range mds {
+		r.metadata[md.ConfigURN()] = md
+	}
+}
+
+// LoadFromYaml takes in a yaml formatted configuration and eagerly processes it for errors.
+//
+// All handlers are validated against their registered characteristic, and it is an error
+// to have configurations for unknown handlers
+func (r *HandlerRegistry) LoadFromYaml(in []byte) error {
+	vs := variants{Variants: r.variations}
+	buf := bytes.NewBuffer(in)
+	d := yaml.NewDecoder(buf)
+	if err := d.Decode(&vs); err != nil {
+		return err
+	}
+
+	err := &unknownHandlersErr{}
+	handlers := map[string]struct{}{}
+	for v, hs := range r.variations {
+		for hk, hyn := range hs.Handlers {
+			handlers[hk] = struct{}{}
+
+			md, ok := r.metadata[hk]
+			if !ok {
+				err.add(hk, v)
+				continue
+			}
+
+			// Validate that handler config so we can give a good error message now.
+			// We re-encode, then decode, since then we don't need to re-implement
+			// the existing Known fields. Sadly, this doens't persist through
+			// yaml.Node fields.
+			hb, err := yaml.Marshal(hyn)
+			if err != nil {
+				panic(fmt.Sprintf("error re-encoding characteristic for variant %v handler %v: %v", v, hk, err))
+			}
+			buf := bytes.NewBuffer(hb)
+			dec := yaml.NewDecoder(buf)
+			dec.KnownFields(true)
+			rt := md.ConfigCharacteristic()
+			rtv := reflect.New(rt)
+			if err := dec.Decode(rtv.Interface()); err != nil {
+				return fmt.Errorf("error decoding characteristic strictly for variant %v handler %v: %v", v, hk, err)
+			}
+
+		}
+	}
+
+	if err.valid() {
+		return err
+	}
+
+	r.variantIDs = maps.Keys(r.variations)
+	sort.Strings(r.variantIDs)
+	r.handerIDs = maps.Keys(handlers)
+	sort.Strings(r.handerIDs)
+	return nil
+}
+
+type unknownHandlersErr struct {
+	handlersToVariants map[string][]string
+}
+
+func (e *unknownHandlersErr) valid() bool {
+	return e.handlersToVariants != nil
+}
+
+func (e *unknownHandlersErr) add(handler, variant string) {
+	if e.handlersToVariants == nil {
+		e.handlersToVariants = map[string][]string{}
+	}
+	vs := e.handlersToVariants[handler]
+	vs = append(vs, variant)
+	e.handlersToVariants[handler] = vs
+}
+
+func (e *unknownHandlersErr) Error() string {
+	var sb strings.Builder
+	sb.WriteString("yaml config contained unknown handlers")
+	for h, vs := range e.handlersToVariants {
+		sort.Strings(vs)
+		sb.WriteString("\n\t")
+		sb.WriteString(h)
+		sb.WriteString(" present in variants ")
+		sb.WriteString(strings.Join(vs, ","))
+	}
+	return sb.String()
+}
+
+// Variants returns the IDs of all registered variations.
+func (r *HandlerRegistry) Variants() []string {
+	return r.variantIDs
+}
+
+// Handlers returns the IDs of all handlers used in variations.
+func (r *HandlerRegistry) UsedHandlers() []string {
+	return r.handerIDs
+}
+
+// GetVariant returns the Variant witn the given name.
+// If none exist, GetVariant returns nil.
+func (r *HandlerRegistry) GetVariant(name string) *Variant {
+	vs, ok := r.variations[name]
+	if !ok {
+		return nil
+	}
+	return &Variant{parent: r, name: name, handlers: vs.Handlers}
+}
+
+type Variant struct {

Review Comment:
   Nit: I don't love the naming collision of the internal, little-v `variant` and the exported `Variant`, it doesn't read particularly clearly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org