You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2021/10/02 05:20:10 UTC

[beam] branch master updated: [BEAM-12513] Schemas and Coders (#15632)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9457b7  [BEAM-12513] Schemas and Coders (#15632)
b9457b7 is described below

commit b9457b711029051a837255e1bb516e55d32e56d5
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Fri Oct 1 22:19:24 2021 -0700

    [BEAM-12513] Schemas and Coders (#15632)
---
 sdks/go/examples/snippets/04transforms.go          |   2 +-
 sdks/go/examples/snippets/06schemas.go             | 143 ++++++++++++++
 sdks/go/examples/snippets/06schemas_test.go        | 170 +++++++++++++++++
 sdks/go/pkg/beam/schema.go                         |  20 +-
 .../content/en/documentation/programming-guide.md  | 212 ++++++++++++++++++++-
 5 files changed, 535 insertions(+), 12 deletions(-)

diff --git a/sdks/go/examples/snippets/04transforms.go b/sdks/go/examples/snippets/04transforms.go
index 4f07f04..d920acd 100644
--- a/sdks/go/examples/snippets/04transforms.go
+++ b/sdks/go/examples/snippets/04transforms.go
@@ -300,7 +300,7 @@ func filterWordsAbove(word string, lengthCutOffIter func(*float64) bool, emitAbo
 	var cutOff float64
 	ok := lengthCutOffIter(&cutOff)
 	if !ok {
-		return fmt.Errorf("No length cutoff provided.")
+		return fmt.Errorf("no length cutoff provided")
 	}
 	if float64(len(word)) > cutOff {
 		emitAboveCutoff(word)
diff --git a/sdks/go/examples/snippets/06schemas.go b/sdks/go/examples/snippets/06schemas.go
new file mode 100644
index 0000000..d68bc7c
--- /dev/null
+++ b/sdks/go/examples/snippets/06schemas.go
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+	"fmt"
+	"io"
+	"reflect"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+)
+
+// [START schema_define]
+
+type Purchase struct {
+	// ID of the user who made the purchase.
+	UserID string `beam:"userId"`
+	// Identifier of the item that was purchased.
+	ItemID int64 `beam:"itemId"`
+	// The shipping address, a nested type.
+	ShippingAddress ShippingAddress `beam:"shippingAddress"`
+	// The cost of the item in cents.
+	Cost int64 `beam:"cost"`
+	// The transactions that paid for this purchase.
+	// A slice since the purchase might be spread out over multiple
+	// credit cards.
+	Transactions []Transaction `beam:"transactions"`
+}
+
+type ShippingAddress struct {
+	StreetAddress string  `beam:"streetAddress"`
+	City          string  `beam:"city"`
+	State         *string `beam:"state"`
+	Country       string  `beam:"country"`
+	PostCode      string  `beam:"postCode"`
+}
+
+type Transaction struct {
+	Bank           string  `beam:"bank"`
+	PurchaseAmount float64 `beam:"purchaseAmount"`
+}
+
+// [END schema_define]
+
+// Validate that the interface is being implemented.
+var _ beam.SchemaProvider = &TimestampNanosProvider{}
+
+// [START schema_logical_provider]
+
+// TimestampNanos is a logical type using time.Time, but
+// encodes as a schema type.
+type TimestampNanos time.Time
+
+func (tn TimestampNanos) Seconds() int64 {
+	return time.Time(tn).Unix()
+}
+func (tn TimestampNanos) Nanos() int32 {
+	return int32(time.Time(tn).UnixNano() % 1000000000)
+}
+
+// tnStorage is the storage schema for TimestampNanos.
+type tnStorage struct {
+	Seconds int64 `beam:"seconds"`
+	Nanos   int32 `beam:"nanos"`
+}
+
+var (
+	// reflect.Type of the Value type of TimestampNanos
+	tnType        = reflect.TypeOf((*TimestampNanos)(nil)).Elem()
+	tnStorageType = reflect.TypeOf((*tnStorage)(nil)).Elem()
+)
+
+// TimestampNanosProvider implements the beam.SchemaProvider interface.
+type TimestampNanosProvider struct{}
+
+// FromLogicalType converts checks if the given type is TimestampNanos, and if so
+// returns the storage type.
+func (p *TimestampNanosProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
+	if rt != tnType {
+		return nil, fmt.Errorf("unable to provide schema.LogicalType for type %v, want %v", rt, tnType)
+	}
+	return tnStorageType, nil
+}
+
+// BuildEncoder builds a Beam schema encoder for the TimestampNanos type.
+func (p *TimestampNanosProvider) BuildEncoder(rt reflect.Type) (func(interface{}, io.Writer) error, error) {
+	if _, err := p.FromLogicalType(rt); err != nil {
+		return nil, err
+	}
+	enc, err := coder.RowEncoderForStruct(tnStorageType)
+	if err != nil {
+		return nil, err
+	}
+	return func(iface interface{}, w io.Writer) error {
+		v := iface.(TimestampNanos)
+		return enc(tnStorage{
+			Seconds: v.Seconds(),
+			Nanos:   v.Nanos(),
+		}, w)
+	}, nil
+}
+
+// BuildDecoder builds a Beam schema decoder for the TimestampNanos type.
+func (p *TimestampNanosProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (interface{}, error), error) {
+	if _, err := p.FromLogicalType(rt); err != nil {
+		return nil, err
+	}
+	dec, err := coder.RowDecoderForStruct(tnStorageType)
+	if err != nil {
+		return nil, err
+	}
+	return func(r io.Reader) (interface{}, error) {
+		s, err := dec(r)
+		if err != nil {
+			return nil, err
+		}
+		tn := s.(tnStorage)
+		return TimestampNanos(time.Unix(tn.Seconds, int64(tn.Nanos))), nil
+	}, nil
+}
+
+// [END schema_logical_provider]
+
+func LogicalTypeExample() {
+	// [START schema_logical_register]
+	beam.RegisterSchemaProvider(tnType, &TimestampNanosProvider{})
+	// [END schema_logical_register]
+}
diff --git a/sdks/go/examples/snippets/06schemas_test.go b/sdks/go/examples/snippets/06schemas_test.go
new file mode 100644
index 0000000..1353c97
--- /dev/null
+++ b/sdks/go/examples/snippets/06schemas_test.go
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package snippets
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder/testutil"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func atomicSchemaField(name string, typ pipepb.AtomicType) *pipepb.Field {
+	return &pipepb.Field{
+		Name: name,
+		Type: &pipepb.FieldType{
+			TypeInfo: &pipepb.FieldType_AtomicType{
+				AtomicType: typ,
+			},
+		},
+	}
+}
+
+func rowSchemaField(name string, typ *pipepb.Schema) *pipepb.Field {
+	return &pipepb.Field{
+		Name: name,
+		Type: &pipepb.FieldType{
+			TypeInfo: &pipepb.FieldType_RowType{
+				RowType: &pipepb.RowType{
+					Schema: typ,
+				},
+			},
+		},
+	}
+}
+
+func listSchemaField(name string, typ *pipepb.Field) *pipepb.Field {
+	return &pipepb.Field{
+		Name: name,
+		Type: &pipepb.FieldType{
+			TypeInfo: &pipepb.FieldType_ArrayType{
+				ArrayType: &pipepb.ArrayType{
+					ElementType: typ.GetType(),
+				},
+			},
+		},
+	}
+}
+
+func nillable(f *pipepb.Field) *pipepb.Field {
+	f.Type.Nullable = true
+	return f
+}
+
+func TestSchemaTypes(t *testing.T) {
+	transactionSchema := &pipepb.Schema{
+		Fields: []*pipepb.Field{
+			atomicSchemaField("bank", pipepb.AtomicType_STRING),
+			atomicSchemaField("purchaseAmount", pipepb.AtomicType_DOUBLE),
+		},
+	}
+	shippingAddressSchema := &pipepb.Schema{
+		Fields: []*pipepb.Field{
+			atomicSchemaField("streetAddress", pipepb.AtomicType_STRING),
+			atomicSchemaField("city", pipepb.AtomicType_STRING),
+			nillable(atomicSchemaField("state", pipepb.AtomicType_STRING)),
+			atomicSchemaField("country", pipepb.AtomicType_STRING),
+			atomicSchemaField("postCode", pipepb.AtomicType_STRING),
+		},
+	}
+
+	tests := []struct {
+		rt     reflect.Type
+		st     *pipepb.Schema
+		preReg func(reg *schema.Registry)
+	}{{
+		rt: reflect.TypeOf(Transaction{}),
+		st: transactionSchema,
+	}, {
+		rt: reflect.TypeOf(ShippingAddress{}),
+		st: shippingAddressSchema,
+	}, {
+		rt: reflect.TypeOf(Purchase{}),
+		st: &pipepb.Schema{
+			Fields: []*pipepb.Field{
+				atomicSchemaField("userId", pipepb.AtomicType_STRING),
+				atomicSchemaField("itemId", pipepb.AtomicType_INT64),
+				rowSchemaField("shippingAddress", shippingAddressSchema),
+				atomicSchemaField("cost", pipepb.AtomicType_INT64),
+				listSchemaField("transactions",
+					rowSchemaField("n/a", transactionSchema)),
+			},
+		},
+	}, {
+		rt: tnType,
+		st: &pipepb.Schema{
+			Fields: []*pipepb.Field{
+				atomicSchemaField("seconds", pipepb.AtomicType_INT64),
+				atomicSchemaField("nanos", pipepb.AtomicType_INT32),
+			},
+		},
+		preReg: func(reg *schema.Registry) {
+			reg.RegisterLogicalType(schema.ToLogicalType(tnType.Name(), tnType, tnStorageType))
+		},
+	}}
+	for _, test := range tests {
+		t.Run(fmt.Sprintf("%v", test.rt), func(t *testing.T) {
+			reg := schema.NewRegistry()
+			if test.preReg != nil {
+				test.preReg(reg)
+			}
+			{
+				got, err := reg.FromType(test.rt)
+				if err != nil {
+					t.Fatalf("error FromType(%v) = %v", test.rt, err)
+				}
+				if d := cmp.Diff(test.st, got,
+					protocmp.Transform(),
+					protocmp.IgnoreFields(proto.Message(&pipepb.Schema{}), "id", "options"),
+				); d != "" {
+					t.Errorf("diff (-want, +got): %v", d)
+				}
+			}
+		})
+	}
+}
+
+func TestSchema_validate(t *testing.T) {
+	tests := []struct {
+		rt               reflect.Type
+		p                beam.SchemaProvider
+		logical, storage interface{}
+	}{
+		{
+			rt:      tnType,
+			p:       &TimestampNanosProvider{},
+			logical: TimestampNanos(time.Unix(2300003, 456789)),
+			storage: tnStorage{},
+		},
+	}
+	for _, test := range tests {
+		sc := &testutil.SchemaCoder{
+			CmpOptions: cmp.Options{
+				cmp.Comparer(func(a, b TimestampNanos) bool {
+					return a.Seconds() == b.Seconds() && a.Nanos() == b.Nanos()
+				})},
+		}
+		sc.Validate(t, test.rt, test.p.BuildEncoder, test.p.BuildDecoder, test.storage, test.logical)
+	}
+}
diff --git a/sdks/go/pkg/beam/schema.go b/sdks/go/pkg/beam/schema.go
index 95811bf..b25a3e2 100644
--- a/sdks/go/pkg/beam/schema.go
+++ b/sdks/go/pkg/beam/schema.go
@@ -16,6 +16,7 @@
 package beam
 
 import (
+	"fmt"
 	"io"
 	"reflect"
 
@@ -55,7 +56,24 @@ import (
 // is called in a package init() function.
 func RegisterSchemaProvider(rt reflect.Type, provider interface{}) {
 	p := provider.(SchemaProvider)
-	schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType)
+	switch rt.Kind() {
+	case reflect.Interface:
+		schema.RegisterLogicalTypeProvider(rt, p.FromLogicalType)
+	case reflect.Ptr:
+		if rt.Elem().Kind() != reflect.Struct {
+			panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind()))
+		}
+		fallthrough
+	case reflect.Struct:
+		st, err := p.FromLogicalType(rt)
+		if err != nil {
+			panic(fmt.Sprintf("beam.RegisterSchemaProvider: schema type provider for %v, doesn't support that type", rt))
+		}
+		schema.RegisterLogicalType(schema.ToLogicalType(rt.Name(), rt, st))
+	default:
+		panic(fmt.Sprintf("beam.RegisterSchemaProvider: unsupported type kind for schema provider %v is a %v, must be interface, struct or *struct.", rt, rt.Kind()))
+	}
+
 	coder.RegisterSchemaProviders(rt, p.BuildEncoder, p.BuildDecoder)
 }
 
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 5d31fe3..0c4d56f 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -1834,7 +1834,7 @@ a single global window and specify a trigger.
 
 ### 4.5. Additional outputs {#additional-outputs}
 
-{{< paragraph class="language-java language-python" >}}
+{{< paragraph class="language-java language-py" >}}
 While `ParDo` always produces a main output `PCollection` (as the return value
 from `apply`), you can also have your `ParDo` produce any number of additional
 output `PCollection`s. If you choose to have multiple outputs, your `ParDo`
@@ -2666,6 +2666,11 @@ infer the correct schema based on the members of the class.
 In Python you can use the following set of classes to represent the purchase schema. Beam will automatically infer the correct schema based on the members of the class.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+In Go, schema encoding is used by default for struct types, with Exported fields becoming part of the schema.
+Beam will automatically infer the schema based on the fields and field tags of the struct, and their order.
+{{< /paragraph >}}
+
 {{< highlight java >}}
 @DefaultSchema(JavaBeanSchema.class)
 public class Purchase {
@@ -2731,6 +2736,10 @@ class Transaction(typing.NamedTuple):
   purchase_amount: float
 {{< /highlight >}}
 
+{{< highlight go >}}
+{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_define >}}
+{{< /highlight >}}
+
 {{< paragraph class="language-java" >}}
 Using JavaBean classes as above is one way to map a schema to Java classes. However multiple Java classes might have
 the same schema, in which case the different Java types can often be used interchangeably. Beam will add implicit
@@ -2887,6 +2896,13 @@ The argument is represented by a schema type, so can itself be a complex type.
 In Java, a logical type is specified as a subclass of the `LogicalType` class. A custom Java class can be specified to represent the logical type and conversion functions must be supplied to convert back and forth between this Java class and the underlying Schema type representation. For example, the logical type representing nanosecond timestamp might be implemented as follows
 {{< /paragraph >}}
 
+
+{{< paragraph class="language-go" >}}
+In Go, a logical type is specified with a custom implementation of the `beam.SchemaProvider` interface.
+For example, the logical type provider representing nanosecond timestamps
+might be implemented as follows
+{{< /paragraph >}}
+
 {{< highlight java >}}
 // A Logical type using java.time.Instant to represent the logical type.
 public class TimestampNanos implements LogicalType<Instant, Row> {
@@ -2909,11 +2925,39 @@ public class TimestampNanos implements LogicalType<Instant, Row> {
 }
 {{< /highlight >}}
 
+{{< highlight go >}}
+// Define a logical provider like so:
+{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_logical_provider >}}
+
+// Register it like so:
+{{< code_sample "sdks/go/examples/snippets/06schemas.go" schema_logical_register >}}
+{{< /highlight >}}
+
 #### 6.4.2. Useful logical types {#built-in-logical-types}
 
+{{< paragraph class="language-py" >}}
+Currently the Python SDK provides minimal convenience logical types,
+other than to handle `MicrosInstant`.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+Currently the Go SDK provides minimal convenience logical types,
+other than to handle additional integer primitives, and `time.Time`.
+{{< /paragraph >}}
+
 ##### **EnumerationType**
 
+{{< paragraph class="language-py" >}}
+This convenience builder doesn't yet exist for the Python SDK.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+This convenience builder doesn't yet exist for the Go SDK.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java" >}}
 This logical type allows creating an enumeration type consisting of a set of named constants.
+{{< /paragraph >}}
 
 {{< highlight java >}}
 Schema schema = Schema.builder()
@@ -2922,8 +2966,10 @@ Schema schema = Schema.builder()
      .build();
 {{< /highlight >}}
 
+{{< paragraph class="language-java" >}}
 The value of this field is stored in the row as an INT32 type, however the logical type defines a value type that lets
 you access the enumeration either as a string or a value. For example:
+{{< /paragraph >}}
 
 {{< highlight java >}}
 EnumerationType.Value enumValue = enumType.valueOf("RED");
@@ -2931,18 +2977,32 @@ enumValue.getValue();  // Returns 0, the integer value of the constant.
 enumValue.toString();  // Returns "RED", the string value of the constant
 {{< /highlight >}}
 
+{{< paragraph class="language-java" >}}
 Given a row object with an enumeration field, you can also extract the field as the enumeration value.
+{{< /paragraph >}}
 
 {{< highlight java >}}
 EnumerationType.Value enumValue = row.getLogicalTypeValue("color", EnumerationType.Value.class);
 {{< /highlight >}}
 
+{{< paragraph class="language-java" >}}
 Automatic schema inference from Java POJOs and JavaBeans automatically converts Java enums to EnumerationType logical
 types.
+{{< /paragraph >}}
 
 ##### **OneOfType**
 
+{{< paragraph class="language-py" >}}
+This convenience builder doesn't yet exist for the Python SDK.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+This convenience builder doesn't yet exist for the Go SDK.
+{{< /paragraph >}}
+
+{{< paragraph class="language-java" >}}
 OneOfType allows creating a disjoint union type over a set of schema fields. For example:
+{{< /paragraph >}}
 
 {{< highlight java >}}
 Schema schema = Schema.builder()
@@ -2954,9 +3014,11 @@ Schema schema = Schema.builder()
       .build();
 {{< /highlight >}}
 
+{{< paragraph class="language-java" >}}
 The value of this field is stored in the row as another Row type, where all the fields are marked as nullable. The
 logical type however defines a Value object that contains an enumeration value indicating which field was set and allows
  getting just that field:
+{{< /paragraph >}}
 
 {{< highlight java >}}
 // Returns an enumeration indicating all possible case values for the enum.
@@ -2978,17 +3040,28 @@ switch (oneOfValue.getCaseEnumType().toString()) {
 }
 {{< /highlight >}}
 
+{{< paragraph class="language-java" >}}
 In the above example we used the field names in the switch statement for clarity, however the enum integer values could
  also be used.
+{{< /paragraph >}}
 
 ### 6.5. Creating Schemas {#creating-schemas}
 
-In order to take advantage of schemas, your `PCollection`s must have a schema attached to it. Often, the source itself will attach a schema to the PCollection. For example, when using `AvroIO` to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam `PCollection`. However not all sources produce schemas. In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas.
+In order to take advantage of schemas, your `PCollection`s must have a schema attached to it.
+Often, the source itself will attach a schema to the PCollection.
+For example, when using `AvroIO` to read Avro files, the source can automatically infer a Beam schema from the Avro schema and attach that to the Beam `PCollection`.
+However not all sources produce schemas.
+In addition, often Beam pipelines have intermediate stages and types, and those also can benefit from the expressiveness of schemas.
 
 #### 6.5.1. Inferring schemas {#inferring-schemas}
 
+{{< language-switcher java py go >}}
+
 {{< paragraph class="language-java" >}}
-Beam is able to infer schemas from a variety of common Java types. The `@DefaultSchema` annotation can be used to tell Beam to infer schemas from a specific type. The annotation takes a `SchemaProvider` as an argument, and `SchemaProvider` classes are already built in for common Java types. The `SchemaRegistry` can also be invoked programmatically for cases where it is not practical to annotate the Java type itself.
+Beam is able to infer schemas from a variety of common Java types.
+The `@DefaultSchema` annotation can be used to tell Beam to infer schemas from a specific type.
+The annotation takes a `SchemaProvider` as an argument, and `SchemaProvider` classes are already built in for common Java types.
+The `SchemaRegistry` can also be invoked programmatically for cases where it is not practical to annotate the Java type itself.
 {{< /paragraph >}}
 
 {{< paragraph class="language-java" >}}
@@ -3191,6 +3264,53 @@ output_pc = input_pc | beam.Map(lambda item: beam.Row(bank=str(item["bank"]),
                                                       purchase_amount=float(item["purchase_amount"])))
 {{< /highlight >}}
 
+{{< paragraph class="language-go" >}}
+Beam currently only infers schemas for exported fields in Go structs.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+**Structs**
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+Beam will automatically infer schemas for all Go structs used
+as PCollection elements, and default to encoding them using
+schema encoding.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+type Transaction struct{
+  Bank string
+  PurchaseAmount float64
+
+  checksum []byte // ignored
+}
+{{< /highlight >}}
+
+{{< paragraph class="language-go" >}}
+Unexported fields are ignored, and cannot be automatically infered as part of the schema.
+Fields of type  func, channel, unsafe.Pointer, or uintptr will be ignored by inference.
+Fields of interface types are ignored, unless a schema provider
+is registered for them.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+By default, schema field names will match the exported struct field names.
+In the above example, "Bank" and "PurchaseAmount" are the schema field names.
+A schema field name can be overridden with a struct tag for the field.
+{{< /paragraph >}}
+
+{{< highlight go >}}
+type Transaction struct{
+  Bank           string  `beam:"bank"`
+  PurchaseAmount float64 `beam:"purchase_amount"`
+}
+{{< /highlight >}}
+
+{{< paragraph class="language-go" >}}
+Overriding schema field names is useful for compatibility cross language transforms,
+as schema fields may have different requirements or restrictions from Go exported fields.
+{{< /paragraph >}}
 
 ### 6.6. Using Schema Transforms {#using-schemas}
 
@@ -3198,6 +3318,10 @@ A schema on a `PCollection` enables a rich variety of relational transforms. The
 named fields allows for simple and readable aggregations that reference fields by name, similar to the aggregations in
 a SQL expression.
 
+{{< paragraph class="language-go" >}}
+Beam does not yet support Schema transforms natively in Go. However, it will be implemented with the following behavior.
+{{< /paragraph >}}
+
 #### 6.6.1. Field selection syntax
 
 The advantage of schemas is that they allow referencing of element fields by name. Beam provides a selection syntax for
@@ -3723,6 +3847,10 @@ A `PCollection` with a schema can apply a `ParDo`, just like any other `PCollect
 
 ##### **Input conversion**
 
+{{< paragraph class="language-go" >}}
+Beam does not yet support input conversion in Go.
+{{< /paragraph >}}
+
 Since Beam knows the schema of the source `PCollection`, it can automatically convert the elements to any Java type for
 which a matching schema is known. For example, using the above-mentioned Transaction schema, say we have the following
 `PCollection`:
@@ -3789,6 +3917,8 @@ automatically convert to any matching schema type, just like when reading the en
 
 ## 7. Data encoding and type safety {#data-encoding-and-type-safety}
 
+{{< language-switcher java py go >}}
+
 When Beam runners execute your pipeline, they often need to materialize the
 intermediate data in your `PCollection`s, which requires converting elements to
 and from byte strings. The Beam SDKs use objects called `Coder`s to describe how
@@ -3817,6 +3947,15 @@ Coder subclasses in the
 package.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+Standard Go types like `int`, `int64` `float64`, `[]byte`, and `string` and more are coded using builtin coders.
+Structs and pointers to structs default using Beam Schema Row encoding.
+However, users can build and register custom coders with `beam.RegisterCoder`.
+You can find available Coder functions in the
+[coder](https://pkg.go.dev/github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coders)
+package.
+{{< /paragraph >}}
+
 > Note that coders do not necessarily have a 1:1 relationship with types. For
 > example, the Integer type can have multiple valid coders, and input and output
 > data can use different Integer coders. A transform might have Integer-typed
@@ -3858,6 +3997,11 @@ Python types to the default coder that should be used for `PCollection`s of each
 type.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+The Beam SDK for Go allows users to register default coder
+implementations with `beam.RegisterCoder`.
+{{< /paragraph >}}
+
 {{< paragraph class="language-java" >}}
 By default, the Beam SDK for Java automatically infers the `Coder` for the
 elements of a `PCollection` produced by a `PTransform` using the type parameter
@@ -3880,12 +4024,24 @@ Python will automatically infer the default `Coder` for the output `PCollection`
 (in the default pipeline `CoderRegistry`, this is `BytesCoder`).
 {{< /paragraph >}}
 
-> NOTE: If you create your `PCollection` from in-memory data by using the
+{{< paragraph class="language-go" >}}
+By default, the Beam SDK for Go automatically infers the `Coder` for the elements of an output `PCollection` by the output of the transform's function object, such as a `DoFn`.
+ In the case of `ParDo`, for example a `DoFn`
+with the parameters of `v int, emit func(string)` accepts an input element of type `int`
+and produces an output element of type `string`.
+In such a case, the Beam SDK for Go will automatically infer the default `Coder` for the output `PCollection` to be the `string_utf8` coder.
+{{< /paragraph >}}
+
+<span class="language-java">
+
+> **Note:** If you create your `PCollection` from in-memory data by using the
 > `Create` transform, you cannot rely on coder inference and default coders.
 > `Create` does not have access to any typing information for its arguments, and
 > may not be able to infer a coder if the argument list contains a value whose
 > exact run-time class doesn't have a default coder registered.
 
+</span>
+
 {{< paragraph class="language-java" >}}
 When using `Create`, the simplest way to ensure that you have the correct coder
 is by invoking `withCoder` when you apply the `Create` transform.
@@ -4019,8 +4175,13 @@ for a Python type. You can use `coders.registry` to access the `CoderRegistry`.
 This allows you to determine (or set) the default Coder for a Python type.
 {{< /paragraph >}}
 
+{{< paragraph class="language-go" >}}
+You can use the `beam.NewCoder` function to determine the default Coder for a Go type.
+{{< /paragraph >}}
+
 #### 7.2.2. Setting the default coder for a type {#setting-default-coder}
 
+{{< paragraph class="language-java language-py" >}}
 To set the default Coder for a
 <span class="language-java">Java</span><span class="language-py">Python</span>
 type for a particular pipeline, you obtain and modify the pipeline's
@@ -4031,11 +4192,23 @@ to get the `CoderRegistry` object, and then use the method
 <span class="language-java">`CoderRegistry.registerCoder`</span>
 <span class="language-py">`CoderRegistry.register_coder`</span>
 to register a new `Coder` for the target type.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+To set the default Coder for a Go type you use the function `beam.RegisterCoder` to register a encoder and decoder functions for the target type.
+However, built in types like `int`, `string`, `float64`, etc cannot have their coders overridde.
+{{< /paragraph >}}
 
+{{< paragraph class="language-java language-py" >}}
 The following example code demonstrates how to set a default Coder, in this case
 `BigEndianIntegerCoder`, for
 <span class="language-java">Integer</span><span class="language-py">int</span>
 values for a pipeline.
+{{< /paragraph >}}
+
+{{< paragraph class="language-go" >}}
+The following example code demonstrates how to set a custom Coder for `MyCustomType` elements.
+{{< /paragraph >}}
 
 {{< highlight java >}}
 PipelineOptions options = PipelineOptionsFactory.create();
@@ -4049,9 +4222,26 @@ cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);
 apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder)
 {{< /highlight >}}
 
+{{< highlight go >}}
+type MyCustomType struct{
+  ...
+}
+
+// See documentation on beam.RegisterCoder for other supported coder forms.
+
+func encode(MyCustomType) []byte { ... }
+
+func decode(b []byte) MyCustomType { ... }
+
+func init() {
+  beam.RegisterCoder(reflect.TypeOf((*MyCustomType)(nil)).Elem(), encode, decode)
+}
+{{< /highlight >}}
+
 #### 7.2.3. Annotating a custom data type with a default coder {#annotating-custom-type-default-coder}
 
-{{< paragraph class="language-java" >}}
+<span class="language-java">
+
 If your pipeline program defines a custom data type, you can use the
 `@DefaultCoder` annotation to specify the coder to use with that type.
 By default, Beam will use `SerializableCoder` which uses Java serialization,
@@ -4064,10 +4254,11 @@ but it has drawbacks:
 
    For key/value pairs, the correctness of key-based operations
    (GroupByKey, Combine) and per-key State depends on having a deterministic
-   coder for the key.
+   coder for the key
 
 You can use the `@DefaultCoder` annotation to set a new default as follows:
-{{< /paragraph >}}
+
+</span>
 
 {{< highlight java >}}
 @DefaultCoder(AvroCoder.class)
@@ -4094,9 +4285,10 @@ public class MyCustomDataType {
 }
 {{< /highlight >}}
 
-{{< paragraph class="language-py" >}}
-The Beam SDK for Python does not support annotating data types with a default
-coder. If you would like to set a default coder, use the method described in the
+{{< paragraph class="language-py language-go" >}}
+The Beam SDK for <span class="language-py">Python</span><span class="language-go">Go</span>
+does not support annotating data types with a default coder.
+If you would like to set a default coder, use the method described in the
 previous section, *Setting the default coder for a type*.
 {{< /paragraph >}}