You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/01 18:13:28 UTC

[GitHub] [beam] lostluck commented on a change in pull request #12141: [BEAM-9615] Add Row coder functions.

lostluck commented on a change in pull request #12141:
URL: https://github.com/apache/beam/pull/12141#discussion_r448535522



##########
File path: sdks/go/pkg/beam/core/graph/coder/row.go
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type typeDecoderReflect struct {
+	typ    reflect.Type
+	fields []func(reflect.Value, io.Reader) error
+}
+
+// RowEncoderForStruct returns an encoding function that encodes a struct type
+// or a pointer to a struct type using the beam row encoding.
+//
+// Returns an error if the given type is invalid or not encodable to a beam
+// schema row.
+func RowEncoderForStruct(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return encoderForType(rt), nil
+}
+
+// RowDecoderForStruct returns a decoding function that decodes the beam row encoding
+// into the given type.
+//
+// Returns an error if the given type is invalid or not decodable from a beam
+// schema row.
+func RowDecoderForStruct(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return decoderForType(rt), nil
+}
+
+func rowTypeValidation(rt reflect.Type, strictExportedFields bool) error {
+	switch k := rt.Kind(); k {
+	case reflect.Ptr, reflect.Struct:
+	default:
+		return errors.Errorf("can't generate row coder for type %v: must be a struct type or pointer to a struct type", rt)
+	}
+	// TODO exported field validation.
+	return nil
+}
+
+// decoderForType returns a decoder function for the struct or pointer to struct type.
+func decoderForType(t reflect.Type) func(io.Reader) (interface{}, error) {
+	var isPtr bool
+	// Pointers become the value type for decomposition.
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	dec := decoderForStructReflect(t)
+
+	if isPtr {
+		return func(r io.Reader) (interface{}, error) {
+			rv := reflect.New(t)
+			err := dec(rv.Elem(), r)
+			return rv.Interface(), err
+		}
+	}
+	return func(r io.Reader) (interface{}, error) {
+		rv := reflect.New(t)
+		err := dec(rv.Elem(), r)
+		return rv.Elem().Interface(), err
+	}
+}
+
+// decoderForSingleTypeReflect returns a reflection based decoder function for the
+// given type.
+func decoderForSingleTypeReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	switch t.Kind() {
+	case reflect.Struct:
+		return decoderForStructReflect(t)
+	case reflect.Bool:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeBool(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding varint field")
+			}
+			rv.SetBool(v)
+			return nil
+		}
+	case reflect.Uint8:
+		return func(rv reflect.Value, r io.Reader) error {
+			b, err := DecodeByte(r)
+			if err != nil {
+				return nil
+			}
+			rv.SetUint(uint64(b))
+			return nil
+		}
+	case reflect.String:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeStringUTF8(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding string field")
+			}
+			rv.SetString(v)
+			return nil
+		}
+	case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeVarInt(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding varint field")
+			}
+			rv.SetInt(v)
+			return nil
+		}
+	case reflect.Float32, reflect.Float64:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeDouble(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding double field")
+			}
+			rv.SetFloat(v)
+			return nil
+		}
+	case reflect.Ptr:
+		decf := decoderForSingleTypeReflect(t.Elem())
+		return func(rv reflect.Value, r io.Reader) error {
+			nv := reflect.New(t.Elem())
+			rv.Set(nv)
+			return decf(nv.Elem(), r)
+		}
+	case reflect.Slice:
+		// Special case handling for byte slices.
+		if t.Elem().Kind() == reflect.Uint8 {
+			return func(rv reflect.Value, r io.Reader) error {
+				b, err := DecodeBytes(r)
+				if err != nil {
+					return err
+				}
+				rv.SetBytes(b)
+				return nil
+			}
+		}
+		decf := decoderForSingleTypeReflect(t.Elem())
+		sdec := iterableDecoderForSlice(t, decf)
+		return func(rv reflect.Value, r io.Reader) error {
+			return sdec(rv, r)
+		}
+	case reflect.Array:
+		decf := decoderForSingleTypeReflect(t.Elem())
+		sdec := iterableDecoderForArray(t, decf)
+		return func(rv reflect.Value, r io.Reader) error {
+			return sdec(rv, r)
+		}
+	}
+	panic(fmt.Sprintf("unimplemented type to decode: %v", t))
+}
+
+// decoderForStructReflect returns a reflection based decoder function for the
+// given struct type.
+func decoderForStructReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	var coder typeDecoderReflect
+	for i := 0; i < t.NumField(); i++ {
+		i := i // avoid alias issues in the closures.
+		dec := decoderForSingleTypeReflect(t.Field(i).Type)
+		coder.fields = append(coder.fields, func(rv reflect.Value, r io.Reader) error {
+			return dec(rv.Field(i), r)
+		})
+	}
+
+	return func(rv reflect.Value, r io.Reader) error {
+		nf, nils, err := readRowHeader(rv, r)
+		if err != nil {
+			return err
+		}
+		if nf != len(coder.fields) {
+			return errors.Errorf("schema[%v] changed: got %d fields, want %d fields", "TODO", nf, len(coder.fields))
+		}
+		for i, f := range coder.fields {
+			if isFieldNil(nils, i) {
+				continue
+			}
+			if err := f(rv, r); err != nil {
+				return err
+			}
+		}
+		return nil
+	}
+}
+
+// isFieldNil examines the passed in packed bits nils buffer
+// and returns true if the field at that index wasn't encoded
+// and can be skipped in decoding.
+func isFieldNil(nils []byte, f int) bool {
+	i, b := f/8, f%8
+	return len(nils) != 0 && (nils[i]>>uint8(b))&0x1 == 1
+}
+
+// encoderForType returns an encoder function for the struct or pointer to struct type.
+func encoderForType(t reflect.Type) func(interface{}, io.Writer) error {
+	var isPtr bool
+	// Pointers become the value type for decomposition.
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	enc := encoderForStructReflect(t)
+
+	if isPtr {
+		return func(v interface{}, w io.Writer) error {
+			return enc(reflect.ValueOf(v).Elem(), w)
+		}
+	}
+	return func(v interface{}, w io.Writer) error {
+		return enc(reflect.ValueOf(v), w)
+	}
+}
+
+// Generates coder using reflection for
+func encoderForSingleTypeReflect(t reflect.Type) func(reflect.Value, io.Writer) error {
+	switch t.Kind() {
+	case reflect.Struct:
+		return encoderForStructReflect(t)
+	case reflect.Bool:
+		return func(rv reflect.Value, w io.Writer) error {
+			return EncodeBool(rv.Bool(), w)
+		}
+	case reflect.Uint8:
+		return func(rv reflect.Value, w io.Writer) error {
+			return EncodeByte(byte(rv.Uint()), w)
+		}
+	case reflect.String:
+		return func(rv reflect.Value, w io.Writer) error {
+			return EncodeStringUTF8(rv.String(), w)
+		}
+	case reflect.Int, reflect.Int64, reflect.Int16, reflect.Int32, reflect.Int8:
+		return func(rv reflect.Value, w io.Writer) error {
+			return EncodeVarInt(int64(rv.Int()), w)
+		}
+	case reflect.Float32, reflect.Float64:
+		return func(rv reflect.Value, w io.Writer) error {
+			return EncodeDouble(float64(rv.Float()), w)
+		}
+	case reflect.Ptr:
+		// Nils are handled at the struct field level.
+		encf := encoderForSingleTypeReflect(t.Elem())
+		return func(rv reflect.Value, w io.Writer) error {
+			return encf(rv.Elem(), w)
+		}
+	case reflect.Slice:
+		// Special case handling for byte slices.
+		if t.Elem().Kind() == reflect.Uint8 {
+			return func(rv reflect.Value, w io.Writer) error {
+				return EncodeBytes(rv.Bytes(), w)
+			}
+		}
+		encf := encoderForSingleTypeReflect(t.Elem())
+		return iterableEncoder(t, encf)
+	case reflect.Array:
+		encf := encoderForSingleTypeReflect(t.Elem())
+		return iterableEncoder(t, encf)
+	}
+	panic(fmt.Sprintf("unimplemented type to encode: %v", t))
+}
+
+type typeEncoderReflect struct {
+	fields []func(reflect.Value, io.Writer) error
+}
+
+// encoderForStructReflect generates reflection field access closures for structs.
+func encoderForStructReflect(t reflect.Type) func(reflect.Value, io.Writer) error {
+	var coder typeEncoderReflect
+	for i := 0; i < t.NumField(); i++ {
+		i := i // avoid alias issues in the closures.

Review comment:
       Yup. Leftover from an intermediate state. Thank you!

##########
File path: sdks/go/pkg/beam/core/graph/coder/row.go
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type typeDecoderReflect struct {
+	typ    reflect.Type
+	fields []func(reflect.Value, io.Reader) error
+}
+
+// RowEncoderForStruct returns an encoding function that encodes a struct type
+// or a pointer to a struct type using the beam row encoding.
+//
+// Returns an error if the given type is invalid or not encodable to a beam
+// schema row.
+func RowEncoderForStruct(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return encoderForType(rt), nil
+}
+
+// RowDecoderForStruct returns a decoding function that decodes the beam row encoding
+// into the given type.
+//
+// Returns an error if the given type is invalid or not decodable from a beam
+// schema row.
+func RowDecoderForStruct(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return decoderForType(rt), nil
+}
+
+func rowTypeValidation(rt reflect.Type, strictExportedFields bool) error {
+	switch k := rt.Kind(); k {
+	case reflect.Ptr, reflect.Struct:
+	default:
+		return errors.Errorf("can't generate row coder for type %v: must be a struct type or pointer to a struct type", rt)
+	}
+	// TODO exported field validation.
+	return nil
+}
+
+// decoderForType returns a decoder function for the struct or pointer to struct type.
+func decoderForType(t reflect.Type) func(io.Reader) (interface{}, error) {
+	var isPtr bool
+	// Pointers become the value type for decomposition.
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	dec := decoderForStructReflect(t)
+
+	if isPtr {
+		return func(r io.Reader) (interface{}, error) {
+			rv := reflect.New(t)
+			err := dec(rv.Elem(), r)
+			return rv.Interface(), err
+		}
+	}
+	return func(r io.Reader) (interface{}, error) {
+		rv := reflect.New(t)
+		err := dec(rv.Elem(), r)
+		return rv.Elem().Interface(), err
+	}
+}
+
+// decoderForSingleTypeReflect returns a reflection based decoder function for the
+// given type.
+func decoderForSingleTypeReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	switch t.Kind() {
+	case reflect.Struct:
+		return decoderForStructReflect(t)
+	case reflect.Bool:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeBool(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding varint field")
+			}
+			rv.SetBool(v)
+			return nil
+		}
+	case reflect.Uint8:
+		return func(rv reflect.Value, r io.Reader) error {
+			b, err := DecodeByte(r)
+			if err != nil {
+				return nil

Review comment:
       Nope. Thanks!

##########
File path: sdks/go/pkg/beam/core/graph/coder/row.go
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type typeDecoderReflect struct {

Review comment:
       Done.

##########
File path: sdks/go/pkg/beam/core/graph/coder/row.go
##########
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package coder
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/core/util/ioutilx"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+)
+
+type typeDecoderReflect struct {
+	typ    reflect.Type
+	fields []func(reflect.Value, io.Reader) error
+}
+
+// RowEncoderForStruct returns an encoding function that encodes a struct type
+// or a pointer to a struct type using the beam row encoding.
+//
+// Returns an error if the given type is invalid or not encodable to a beam
+// schema row.
+func RowEncoderForStruct(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return encoderForType(rt), nil
+}
+
+// RowDecoderForStruct returns a decoding function that decodes the beam row encoding
+// into the given type.
+//
+// Returns an error if the given type is invalid or not decodable from a beam
+// schema row.
+func RowDecoderForStruct(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+	if err := rowTypeValidation(rt, true); err != nil {
+		return nil, err
+	}
+	return decoderForType(rt), nil
+}
+
+func rowTypeValidation(rt reflect.Type, strictExportedFields bool) error {
+	switch k := rt.Kind(); k {
+	case reflect.Ptr, reflect.Struct:
+	default:
+		return errors.Errorf("can't generate row coder for type %v: must be a struct type or pointer to a struct type", rt)
+	}
+	// TODO exported field validation.
+	return nil
+}
+
+// decoderForType returns a decoder function for the struct or pointer to struct type.
+func decoderForType(t reflect.Type) func(io.Reader) (interface{}, error) {
+	var isPtr bool
+	// Pointers become the value type for decomposition.
+	if t.Kind() == reflect.Ptr {
+		isPtr = true
+		t = t.Elem()
+	}
+	dec := decoderForStructReflect(t)
+
+	if isPtr {
+		return func(r io.Reader) (interface{}, error) {
+			rv := reflect.New(t)
+			err := dec(rv.Elem(), r)
+			return rv.Interface(), err
+		}
+	}
+	return func(r io.Reader) (interface{}, error) {
+		rv := reflect.New(t)
+		err := dec(rv.Elem(), r)
+		return rv.Elem().Interface(), err
+	}
+}
+
+// decoderForSingleTypeReflect returns a reflection based decoder function for the
+// given type.
+func decoderForSingleTypeReflect(t reflect.Type) func(reflect.Value, io.Reader) error {
+	switch t.Kind() {
+	case reflect.Struct:
+		return decoderForStructReflect(t)
+	case reflect.Bool:
+		return func(rv reflect.Value, r io.Reader) error {
+			v, err := DecodeBool(r)
+			if err != nil {
+				return errors.Wrap(err, "error decoding varint field")

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org