You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/08/18 15:07:39 UTC

[beam] branch master updated: Go stateful DoFns user side changes (#22761)

This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 75eb0b1431c Go stateful DoFns user side changes (#22761)
75eb0b1431c is described below

commit 75eb0b1431c84c98f2e16a9f535b0e11b0160d43
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu Aug 18 08:07:30 2022 -0700

    Go stateful DoFns user side changes (#22761)
    
    * Go stateful DoFns user side changes
    
    * Fix static check violation
    
    * Small cleanup
    
    * Doc comments
---
 sdks/go/pkg/beam/core/funcx/fn.go                 |  45 +++-
 sdks/go/pkg/beam/core/funcx/fn_test.go            |  16 ++
 sdks/go/pkg/beam/core/graph/edge.go               |  17 +-
 sdks/go/pkg/beam/core/graph/fn.go                 |  75 +++++-
 sdks/go/pkg/beam/core/graph/fn_test.go            |  89 +++++--
 sdks/go/pkg/beam/core/runtime/exec/fn.go          |  32 ++-
 sdks/go/pkg/beam/core/runtime/exec/translate.go   |  16 ++
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go |   5 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go |  24 ++
 sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go  | 300 +++++++++++-----------
 sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto  |   2 +
 sdks/go/pkg/beam/core/state/state.go              | 120 +++++++++
 sdks/go/pkg/beam/core/state/state_test.go         | 140 ++++++++++
 sdks/go/pkg/beam/pardo.go                         |  13 +
 14 files changed, 715 insertions(+), 179 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..788d6d52df6 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -20,6 +20,7 @@ import (
 	"reflect"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -82,6 +83,8 @@ const (
 	FnBundleFinalization FnParamKind = 0x800
 	// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
 	FnWatermarkEstimator FnParamKind = 0x1000
+	// FnState indicates a function input parameter that implements state.Provider
+	FnStateProvider FnParamKind = 0x2000
 )
 
 func (k FnParamKind) String() string {
@@ -112,6 +115,8 @@ func (k FnParamKind) String() string {
 		return "BundleFinalization"
 	case FnWatermarkEstimator:
 		return "WatermarkEstimator"
+	case FnStateProvider:
+		return "StateProvider"
 	default:
 		return fmt.Sprintf("%v", int(k))
 	}
@@ -289,6 +294,17 @@ func (u *Fn) BundleFinalization() (pos int, exists bool) {
 	return -1, false
 }
 
+// StateProvider returns (index, true) iff the function expects a
+// parameter that implements state.Provider.
+func (u *Fn) StateProvider() (pos int, exists bool) {
+	for i, p := range u.Param {
+		if p.Kind == FnStateProvider {
+			return i, true
+		}
+	}
+	return -1, false
+}
+
 // WatermarkEstimator returns (index, true) iff the function expects a
 // parameter that implements sdf.WatermarkEstimator.
 func (u *Fn) WatermarkEstimator() (pos int, exists bool) {
@@ -374,6 +390,8 @@ func New(fn reflectx.Func) (*Fn, error) {
 			kind = FnWindow
 		case t == typex.BundleFinalizationType:
 			kind = FnBundleFinalization
+		case t == state.ProviderType:
+			kind = FnStateProvider
 		case t == reflectx.Type:
 			kind = FnType
 		case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
@@ -464,7 +482,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam {
 }
 
 // The order of present parameters and return values must be as follows:
-// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
 //     where ? indicates 0 or 1, and * indicates any number.
 //     and  a SideInput is one of FnValue or FnIter or FnReIter
 // Note: Fns with inputs must have at least one FnValue as the main input.
@@ -496,6 +514,7 @@ var (
 	errReflectTypePrecedence             = errors.New("may only have a single reflect.Type parameter and it must precede the main input parameter")
 	errRTrackerPrecedence                = errors.New("may only have a single sdf.RTracker parameter and it must precede the main input parameter")
 	errBundleFinalizationPrecedence      = errors.New("may only have a single BundleFinalization parameter and it must precede the main input parameter")
+	errStateProviderPrecedence           = errors.New("may only have a single state.Provider parameter and it must precede the main input parameter")
 	errInputPrecedence                   = errors.New("inputs parameters must precede emit function parameters")
 )
 
@@ -513,6 +532,7 @@ const (
 	psOutput
 	psRTracker
 	psBundleFinalization
+	psStateProvider
 )
 
 func nextParamState(cur paramState, transition FnParamKind) (paramState, error) {
@@ -535,6 +555,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psContext:
 		switch transition {
@@ -552,6 +574,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psPane:
 		switch transition {
@@ -567,6 +591,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psWindow:
 		switch transition {
@@ -580,6 +606,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psEventTime:
 		switch transition {
@@ -591,6 +619,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psWatermarkEstimator:
 		switch transition {
@@ -600,6 +630,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psType:
 		switch transition {
@@ -607,13 +639,22 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 			return psBundleFinalization, nil
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psBundleFinalization:
 		switch transition {
 		case FnRTracker:
 			return psRTracker, nil
+		case FnStateProvider:
+			return psStateProvider, nil
 		}
 	case psRTracker:
+		switch transition {
+		case FnStateProvider:
+			return psStateProvider, nil
+		}
+	case psStateProvider:
 		// Completely handled by the default clause
 	case psInput:
 		switch transition {
@@ -644,6 +685,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 		return -1, errBundleFinalizationPrecedence
 	case FnRTracker:
 		return -1, errRTrackerPrecedence
+	case FnStateProvider:
+		return -1, errStateProviderPrecedence
 	case FnIter, FnReIter, FnValue, FnMultiMap:
 		return psInput, nil
 	case FnEmit:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..9eee359c1e5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -26,6 +26,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -108,6 +109,11 @@ func TestNew(t *testing.T) {
 			Fn:    func(typex.PaneInfo, typex.Window, typex.EventTime, sdf.WatermarkEstimator, reflect.Type, []byte) {},
 			Param: []FnParamKind{FnPane, FnWindow, FnEventTime, FnWatermarkEstimator, FnType, FnValue},
 		},
+		{
+			Name:  "good10",
+			Fn:    func(sdf.RTracker, state.Provider, []byte) {},
+			Param: []FnParamKind{FnRTracker, FnStateProvider, FnValue},
+		},
 		{
 			Name:  "good-method",
 			Fn:    foo{1}.Do,
@@ -211,6 +217,16 @@ func TestNew(t *testing.T) {
 			Fn:   func(int, func(int), func() func(*int) bool) {},
 			Err:  errInputPrecedence,
 		},
+		{
+			Name: "errInputPrecedence- StateProvider before RTracker",
+			Fn:   func(state.Provider, sdf.RTracker, int) {},
+			Err:  errRTrackerPrecedence,
+		},
+		{
+			Name: "errInputPrecedence- StateProvider after output",
+			Fn:   func(int, state.Provider) {},
+			Err:  errStateProviderPrecedence,
+		},
 		{
 			Name: "errInputPrecedence- input after output",
 			Fn:   func(int, func(int), int) {},
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index 5e18e7559ec..a9f1c8a092b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -153,14 +153,15 @@ type MultiEdge struct {
 	parent *Scope
 
 	Op               Opcode
-	DoFn             *DoFn              // ParDo
-	RestrictionCoder *coder.Coder       // SplittableParDo
-	CombineFn        *CombineFn         // Combine
-	AccumCoder       *coder.Coder       // Combine
-	Value            []byte             // Impulse
-	External         *ExternalTransform // Current External Transforms API
-	Payload          *Payload           // Legacy External Transforms API
-	WindowFn         *window.Fn         // WindowInto
+	DoFn             *DoFn                   // ParDo
+	RestrictionCoder *coder.Coder            // SplittableParDo
+	StateCoders      map[string]*coder.Coder // Stateful ParDo
+	CombineFn        *CombineFn              // Combine
+	AccumCoder       *coder.Coder            // Combine
+	Value            []byte                  // Impulse
+	External         *ExternalTransform      // Current External Transforms API
+	Payload          *Payload                // Legacy External Transforms API
+	WindowFn         *window.Fn              // WindowInto
 
 	Input  []*Inbound
 	Output []*Outbound
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 6b3656c1c25..2034349cdf2 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -21,6 +21,7 @@ import (
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -282,6 +283,27 @@ func (f *DoFn) IsSplittable() bool {
 	return ok
 }
 
+// PipelineState returns a list of PipelineState objects used to access/mutate global pipeline state (if any).
+func (f *DoFn) PipelineState() []state.PipelineState {
+	var s []state.PipelineState
+	if f.Recv == nil {
+		return s
+	}
+
+	v := reflect.Indirect(reflect.ValueOf(f.Recv))
+
+	for i := 0; i < v.NumField(); i++ {
+		f := v.Field(i)
+		if f.CanInterface() {
+			if ps, ok := f.Interface().(state.PipelineState); ok {
+				s = append(s, ps)
+			}
+		}
+	}
+
+	return s
+}
+
 // SplittableDoFn represents a DoFn implementing SDF methods.
 type SplittableDoFn DoFn
 
@@ -561,7 +583,14 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) {
 		}
 	}
 
-	return (*DoFn)(fn), nil
+	doFn := (*DoFn)(fn)
+
+	err = validateState(doFn, numMainIn)
+	if err != nil {
+		return nil, addContext(err, fn)
+	}
+
+	return doFn, nil
 }
 
 // validateMainInputs checks that a method has the given number of main inputs
@@ -1221,6 +1250,50 @@ func validateStatefulWatermarkSig(fn *Fn, numMainIn int) error {
 	return nil
 }
 
+func validateState(fn *DoFn, numIn mainInputs) error {
+	ps := fn.PipelineState()
+
+	if _, hasSp := fn.methods[processElementName].StateProvider(); hasSp {
+		if numIn == MainSingle {
+			err := errors.Errorf("ProcessElement uses a StateProvider, but is not keyed")
+			return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but is not keyed. "+
+				"All stateful DoFns must take a key/value pair as an input.")
+		}
+		if len(ps) == 0 {
+			err := errors.Errorf("ProcessElement uses a StateProvider, but noState structs are attached to the DoFn")
+			return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but no State structs are "+
+				"attached to the DoFn. Ensure that you are including the State structs you're using to read/write"+
+				"global state as public uppercase member variables.")
+		}
+		stateKeys := make(map[string]state.PipelineState)
+		for _, s := range ps {
+			k := s.StateKey()
+			if orig, ok := stateKeys[k]; ok {
+				err := errors.Errorf("Duplicate state key %v", k)
+				return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+
+					"unique per DoFn", k, orig, s)
+			} else {
+				stateKeys[k] = s
+			}
+		}
+
+		// TODO(#22736) - Remove this once state is fully supported
+		err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
+		return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
+			"Please try upgrading to a newer release if one exists or wait for state support to be released.")
+	} else {
+		if len(ps) > 0 {
+			err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+
+				"the DoFn: %v", ps)
+			return errors.SetTopLevelMsgf(err, "ProcessElement doesn't use a StateProvider, but State structs are "+
+				"attached to the DoFn: %v\nEnsure that you are using the StateProvider to perform any reads or writes"+
+				"of pipeline state.", ps)
+		}
+	}
+
+	return nil
+}
+
 // CombineFn represents a CombineFn.
 type CombineFn Fn
 
diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go b/sdks/go/pkg/beam/core/graph/fn_test.go
index a1702175f64..2117c735dce 100644
--- a/sdks/go/pkg/beam/core/graph/fn_test.go
+++ b/sdks/go/pkg/beam/core/graph/fn_test.go
@@ -25,6 +25,7 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -52,6 +53,8 @@ func TestNewDoFn(t *testing.T) {
 			{dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)},
 			{dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)},
 			{dfn: &GoodDoFnCoGbk1wSide{}, opt: NumMainInputs(MainKv)},
+			// TODO(#22736) - Enable this once stateful dofns are fully supported
+			// {dfn: &GoodStatefulDoFn{State1: state.Value[int](state.MakeValueState[int]("state1"))}, opt: NumMainInputs(MainKv)},
 		}
 
 		for _, test := range tests {
@@ -70,7 +73,8 @@ func TestNewDoFn(t *testing.T) {
 	})
 	t.Run("invalid", func(t *testing.T) {
 		tests := []struct {
-			dfn interface{}
+			dfn       interface{}
+			numInputs int
 		}{
 			// Validate main inputs.
 			{dfn: func() int { return 0 }}, // No inputs.
@@ -94,26 +98,44 @@ func TestNewDoFn(t *testing.T) {
 			{dfn: &BadDoFnReturnValuesInFinishBundle{}},
 			{dfn: &BadDoFnReturnValuesInSetup{}},
 			{dfn: &BadDoFnReturnValuesInTeardown{}},
+			{dfn: &BadStatefulDoFnNoStateProvider{State1: state.Value[int](state.MakeValueState[int]("state1"))}},
+			{dfn: &BadStatefulDoFnNoStateFields{}},
+			{dfn: &BadStatefulDoFnNoKV{State1: state.Value[int](state.MakeValueState[int]("state1"))}, numInputs: 1},
 		}
 		for _, test := range tests {
 			t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) {
-				if cfn, err := NewDoFn(test.dfn); err != nil {
-					t.Logf("NewDoFn failed as expected:\n%v", err)
-				} else {
-					t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn)
-				}
-				// If validation fails with unknown main inputs, then it should
-				// always fail for any known number of main inputs, so test them
-				// all. Error messages won't necessarily match.
-				if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
-					t.Logf("NewDoFn failed as expected:\n%v", err)
-				} else {
-					t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
-				}
-				if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
-					t.Logf("NewDoFn failed as expected:\n%v", err)
-				} else {
-					t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+				switch test.numInputs {
+				case 1:
+					if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
+						t.Logf("NewDoFn failed as expected:\n%v", err)
+					} else {
+						t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+					}
+				case 2:
+					if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
+						t.Logf("NewDoFn failed as expected:\n%v", err)
+					} else {
+						t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+					}
+				default:
+					if cfn, err := NewDoFn(test.dfn); err != nil {
+						t.Logf("NewDoFn failed as expected:\n%v", err)
+					} else {
+						t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn)
+					}
+					// If validation fails with unknown main inputs, then it should
+					// always fail for any known number of main inputs, so test them
+					// all. Error messages won't necessarily match.
+					if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
+						t.Logf("NewDoFn failed as expected:\n%v", err)
+					} else {
+						t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+					}
+					if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
+						t.Logf("NewDoFn failed as expected:\n%v", err)
+					} else {
+						t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+					}
 				}
 			})
 		}
@@ -1058,6 +1080,14 @@ func (fn *GoodStatefulWatermarkEstimatingKv) WatermarkEstimatorState(estimator *
 	return 0
 }
 
+type GoodStatefulDoFn struct {
+	State1 state.Value[int]
+}
+
+func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, int, int) int {
+	return 0
+}
+
 // Examples of incorrect SDF signatures.
 // Examples with missing methods.
 
@@ -1422,6 +1452,29 @@ func (fn *BadManualWatermarkEstimatorMismatched) CreateWatermarkEstimator() *Wat
 	return &WatermarkEstimatorT{}
 }
 
+type BadStatefulDoFnNoStateProvider struct {
+	State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoStateProvider) ProcessElement(int, int) int {
+	return 0
+}
+
+type BadStatefulDoFnNoStateFields struct {
+}
+
+func (fn *BadStatefulDoFnNoStateFields) ProcessElement(state.Provider, int) int {
+	return 0
+}
+
+type BadStatefulDoFnNoKV struct {
+	State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoKV) ProcessElement(state.Provider, int, int) int {
+	return 0
+}
+
 // Examples of correct CombineFn signatures
 
 type MyAccum struct{}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 48387f78c34..51f3296ca44 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
@@ -68,6 +69,21 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
 	}
 }
 
+type stateProvider struct {
+}
+
+// ReadValueState reads a value state from the State API
+func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) {
+	// TODO(#22736) - read from the state api.
+	return nil, nil, errors.New("Stateful DoFns are not supported yet.")
+}
+
+// WriteValueState writes a value state to the State API
+func (s *stateProvider) WriteValueState(val state.Transaction) error {
+	// TODO(#22736) - read from the state api.
+	return errors.New("Stateful DoFns are not supported yet.")
+}
+
 // Invoke invokes the fn with the given values. The extra values must match the non-main
 // side input and emitters. It returns the direct output, if any.
 func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, extra ...interface{}) (*FullValue, error) {
@@ -92,10 +108,11 @@ func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, b
 type invoker struct {
 	fn   *funcx.Fn
 	args []interface{}
+	sp   *stateProvider
 	// TODO(lostluck):  2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement.
-	ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx int   // specialized input indexes
-	outEtIdx, outPcIdx, outErrIdx              int   // specialized output indexes
-	in, out                                    []int // general indexes
+	ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int   // specialized input indexes
+	outEtIdx, outPcIdx, outErrIdx                     int   // specialized output indexes
+	in, out                                           []int // general indexes
 
 	ret                     FullValue                     // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue.
 	elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types.
@@ -125,6 +142,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
 	if n.weIdx, ok = fn.WatermarkEstimator(); !ok {
 		n.weIdx = -1
 	}
+	if n.spIdx, ok = fn.StateProvider(); !ok {
+		n.spIdx = -1
+	}
 	if n.outEtIdx, ok = fn.OutEventTime(); !ok {
 		n.outEtIdx = -1
 	}
@@ -188,6 +208,12 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 		args[n.weIdx] = we
 	}
 
+	if n.spIdx >= 0 {
+		// TODO(#22736) - provide this with the variable access it needs to talk to the state api.
+		n.sp = &stateProvider{}
+		args[n.spIdx] = n.sp
+	}
+
 	// (2) Main input from value, if any.
 	i := 0
 	if opt != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index e5b9976cb34..53ddf7843c2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -404,6 +404,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 		urnTruncateSizedRestrictions:
 		var data string
 		var sides map[string]*pipepb.SideInput
+		var userState map[string]*pipepb.StateSpec
 		switch urn {
 		case graphx.URNParDo,
 			urnPairWithRestriction,
@@ -416,6 +417,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 			}
 			data = string(pardo.GetDoFn().GetPayload())
 			sides = pardo.GetSideInputs()
+			userState = pardo.GetStateSpecs()
 		case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
 			var cmb pipepb.CombinePayload
 			if err := proto.Unmarshal(payload, &cmb); err != nil {
@@ -462,6 +464,20 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
 					n.PID = transform.GetUniqueName()
 
 					input := unmarshalKeyedValues(transform.GetInputs())
+
+					if len(userState) > 0 {
+						stateIdToCoder := make(map[string]*coder.Coder)
+						for key, spec := range userState {
+							// TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types.
+							cID := spec.GetReadModifyWriteSpec().CoderId
+							c, err := b.coders.Coder(cID)
+							if err != nil {
+								return nil, err
+							}
+							stateIdToCoder[key] = c
+						}
+					}
+
 					for i := 1; i < len(input); i++ {
 						// TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 75ee58484fd..4fe1a9e1616 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -27,6 +27,7 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
 	v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
@@ -517,6 +518,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, bool) {
 		return v1pb.Type_WINDOW, true
 	case typex.BundleFinalizationType:
 		return v1pb.Type_BUNDLEFINALIZATION, true
+	case state.ProviderType:
+		return v1pb.Type_STATEPROVIDER, true
 	case typex.KVType:
 		return v1pb.Type_KV, true
 	case typex.CoGBKType:
@@ -681,6 +684,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, error) {
 		return typex.WindowType, nil
 	case v1pb.Type_BUNDLEFINALIZATION:
 		return typex.BundleFinalizationType, nil
+	case v1pb.Type_STATEPROVIDER:
+		return state.ProviderType, nil
 	case v1pb.Type_KV:
 		return typex.KVType, nil
 	case v1pb.Type_COGBK:
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 77797d756f1..653dacce7a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -70,6 +70,7 @@ const (
 
 	URNRequiresSplittableDoFn     = "beam:requirement:pardo:splittable_dofn:v1"
 	URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+	URNRequiresStatefulProcessing = "beam:requirement:pardo:stateful:v1"
 	URNTruncate                   = "beam:transform:sdf_truncate_sized_restrictions:v1"
 
 	// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
@@ -457,6 +458,29 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
 		if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
 			m.requirements[URNRequiresBundleFinalization] = true
 		}
+		if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok {
+			m.requirements[URNRequiresStatefulProcessing] = true
+			stateSpecs := make(map[string]*pipepb.StateSpec)
+			for _, ps := range edge.Edge.DoFn.PipelineState() {
+				coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
+				if err != nil {
+					return handleErr(err)
+				}
+				stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
+					// TODO (#22736) - make spec type and protocol conditional on type of State. Right now, assumes ValueState.
+					// See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
+					Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
+						ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
+							CoderId: coderId,
+						},
+					},
+					Protocol: &pipepb.FunctionSpec{
+						Urn: "beam:user_state:bag:v1",
+					},
+				}
+			}
+			payload.StateSpecs = stateSpecs
+		}
 		spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
 		annotations = edge.Edge.DoFn.Annotations()
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
index 413aa0f4d8c..e2e1e78c061 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
@@ -24,7 +24,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
 // 	protoc-gen-go v1.27.1
-// 	protoc        v3.19.4
+// 	protoc        v3.14.0
 // source: go/pkg/beam/core/runtime/graphx/v1/v1.proto
 
 package v1
@@ -218,6 +218,7 @@ const (
 	Type_COGBK              Type_Special = 13
 	Type_WINDOWEDVALUE      Type_Special = 14
 	Type_BUNDLEFINALIZATION Type_Special = 23
+	Type_STATEPROVIDER      Type_Special = 24
 	Type_T                  Type_Special = 15
 	Type_U                  Type_Special = 16
 	Type_V                  Type_Special = 17
@@ -240,6 +241,7 @@ var (
 		13: "COGBK",
 		14: "WINDOWEDVALUE",
 		23: "BUNDLEFINALIZATION",
+		24: "STATEPROVIDER",
 		15: "T",
 		16: "U",
 		17: "V",
@@ -259,6 +261,7 @@ var (
 		"COGBK":              13,
 		"WINDOWEDVALUE":      14,
 		"BUNDLEFINALIZATION": 23,
+		"STATEPROVIDER":      24,
 		"T":                  15,
 		"U":                  16,
 		"V":                  17,
@@ -1372,7 +1375,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
 	0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73,
 	0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e,
 	0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61,
-	0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xcb, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12,
+	0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xde, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12,
 	0x56, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x42, 0x2e,
 	0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e,
 	0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d,
@@ -1453,7 +1456,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
 	0x52, 0x4e, 0x41, 0x4c, 0x10, 0x1a, 0x22, 0x27, 0x0a, 0x07, 0x43, 0x68, 0x61, 0x6e, 0x44, 0x69,
 	0x72, 0x12, 0x08, 0x0a, 0x04, 0x52, 0x45, 0x43, 0x56, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53,
 	0x45, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x42, 0x4f, 0x54, 0x48, 0x10, 0x02, 0x22,
-	0xc2, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49,
+	0xd5, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49,
 	0x4c, 0x4c, 0x45, 0x47, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f,
 	0x52, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x10, 0x02,
 	0x12, 0x08, 0x0a, 0x04, 0x54, 0x59, 0x50, 0x45, 0x10, 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x45, 0x56,
@@ -1462,175 +1465,176 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
 	0x05, 0x43, 0x4f, 0x47, 0x42, 0x4b, 0x10, 0x0d, 0x12, 0x11, 0x0a, 0x0d, 0x57, 0x49, 0x4e, 0x44,
 	0x4f, 0x57, 0x45, 0x44, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x10, 0x0e, 0x12, 0x16, 0x0a, 0x12, 0x42,
 	0x55, 0x4e, 0x44, 0x4c, 0x45, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x41, 0x54, 0x49, 0x4f,
-	0x4e, 0x10, 0x17, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a, 0x01, 0x55, 0x10,
-	0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57, 0x10, 0x12, 0x12,
-	0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14, 0x12, 0x05, 0x0a,
-	0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70,
-	0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61,
-	0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65,
-	0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e,
-	0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04,
-	0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e,
-	0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
+	0x4e, 0x10, 0x17, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x50, 0x52, 0x4f, 0x56,
+	0x49, 0x44, 0x45, 0x52, 0x10, 0x18, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a,
+	0x01, 0x55, 0x10, 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57,
+	0x10, 0x12, 0x12, 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14,
+	0x12, 0x05, 0x0a, 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c,
+	0x54, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
+	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
+	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
+	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70,
+	0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f,
+	0x6e, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72,
+	0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64,
+	0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63,
+	0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70,
+	0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a,
+	0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73,
+	0x65, 0x72, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
+	0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
+	0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
+	0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
+	0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
+	0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05,
+	0x44, 0x79, 0x6e, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20,
+	0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70,
+	0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70,
+	0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67,
+	0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e,
+	0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76,
+	0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04,
+	0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
+	0x12, 0x10, 0x0a, 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67,
+	0x65, 0x6e, 0x22, 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18,
+	0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+	0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
+	0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
+	0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
+	0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79,
+	0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
 	0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e,
 	0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65,
 	0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e,
-	0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x63, 0x6f, 0x6d,
-	0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x46,
-	0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
-	0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20,
-	0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
-	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
-	0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
-	0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79,
-	0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05, 0x44, 0x79, 0x6e,
-	0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+	0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a,
+	0x03, 0x6f, 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12,
+	0x54, 0x0a, 0x05, 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e,
+	0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d,
+	0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61,
+	0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67,
+	0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05,
+	0x64, 0x79, 0x6e, 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46,
+	0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+	0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73,
+	0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b,
+	0x0a, 0x09, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28,
+	0x03, 0x52, 0x08, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67,
+	0x61, 0x70, 0x5f, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70,
+	0x4d, 0x73, 0x22, 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64,
+	0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
 	0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02,
 	0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
 	0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70,
 	0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e,
 	0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54,
-	0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74,
-	0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a,
-	0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67, 0x65, 0x6e, 0x22,
-	0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
-	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
-	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
-	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65,
-	0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18,
-	0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+	0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63,
+	0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
+	0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
+	0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
+	0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
+	0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03,
+	0x64, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e,
+	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73,
+	0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72,
+	0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78,
+	0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22,
+	0xba, 0x06, 0x0a, 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a,
+	0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e,
+	0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73,
+	0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72,
+	0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78,
+	0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70,
+	0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f,
+	0x64, 0x65, 0x12, 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18,
+	0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
 	0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
 	0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
 	0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
-	0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x70,
-	0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12, 0x54, 0x0a, 0x05,
-	0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e, 0x2e, 0x6f, 0x72,
+	0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77,
+	0x46, 0x6e, 0x12, 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20,
+	0x03, 0x28, 0x0b, 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
+	0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
+	0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75,
+	0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52,
+	0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62,
+	0x6f, 0x75, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67,
+	0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b,
+	0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f,
+	0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68,
+	0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f,
+	0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e,
+	0x64, 0x1a, 0xb5, 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a,
+	0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72,
 	0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64,
 	0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63,
 	0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70,
-	0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05, 0x64, 0x79, 0x6e,
-	0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12, 0x12,
-	0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69,
-	0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73, 0x18, 0x02, 0x20,
-	0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x70,
-	0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08,
-	0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67, 0x61, 0x70, 0x5f,
-	0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70, 0x4d, 0x73, 0x22,
-	0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x12,
-	0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
-	0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
-	0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62,
-	0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e,
-	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d,
-	0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65,
-	0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63, 0x18, 0x03, 0x20,
-	0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+	0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e,
+	0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e,
+	0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18,
+	0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+	0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
+	0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
+	0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
+	0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69,
+	0x0a, 0x09, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49,
+	0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e,
+	0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10,
+	0x02, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03,
+	0x4d, 0x41, 0x50, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41,
+	0x50, 0x10, 0x05, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a,
+	0x06, 0x52, 0x45, 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74,
+	0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
+	0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
 	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
 	0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
-	0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73,
-	0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03, 0x64, 0x65, 0x63,
-	0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
-	0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
-	0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
-	0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
-	0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22, 0xba, 0x06, 0x0a,
-	0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a, 0x02, 0x66, 0x6e,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
-	0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
-	0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
-	0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
-	0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64,
-	0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12,
-	0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18, 0x05, 0x20, 0x01,
-	0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
-	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
-	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
-	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x69, 0x6e,
-	0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12,
-	0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
-	0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65,
-	0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62,
-	0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65,
-	0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69,
-	0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x07, 0x69, 0x6e,
-	0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e,
-	0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70,
-	0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67,
-	0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e,
-	0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76,
-	0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f, 0x75, 0x74, 0x62,
-	0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x1a, 0xb5,
-	0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a, 0x04, 0x6b, 0x69,
-	0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
+	0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75,
+	0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d,
+	0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a,
+	0x01, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10,
+	0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+	0x12, 0x19, 0x0a, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e,
+	0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02,
+	0x20, 0x03, 0x28, 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
+	0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70,
+	0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e,
+	0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52,
+	0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e,
+	0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74,
+	0x72, 0x79, 0x52, 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+	0x73, 0x1a, 0x40, 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61,
+	0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
+	0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
+	0x02, 0x38, 0x01, 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72,
+	0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18,
+	0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64,
+	0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
 	0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e,
 	0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65,
 	0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e,
-	0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62,
-	0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04,
-	0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01,
-	0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
+	0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64,
+	0x67, 0x65, 0x12, 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01,
+	0x28, 0x0b, 0x32, 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
 	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
 	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
-	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c,
-	0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69, 0x0a, 0x09, 0x49,
-	0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41,
-	0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e, 0x10, 0x01, 0x12,
-	0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10, 0x02, 0x12, 0x09,
-	0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, 0x4d, 0x41, 0x50,
-	0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41, 0x50, 0x10, 0x05,
-	0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a, 0x06, 0x52, 0x45,
-	0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75,
-	0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
-	0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65,
-	0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62,
-	0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65,
-	0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54,
-	0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d, 0x49, 0x6e, 0x6a,
-	0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a, 0x01, 0x6e, 0x18,
-	0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x73,
-	0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x19, 0x0a,
-	0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
-	0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x64,
-	0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
-	0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62,
-	0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e,
-	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d,
-	0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x68,
-	0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x43, 0x6f, 0x64,
-	0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
-	0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x1a, 0x40,
-	0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45,
-	0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
-	0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
-	0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
-	0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x50, 0x61,
-	0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, 0x01, 0x20, 0x01,
-	0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64, 0x67, 0x65, 0x18,
-	0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a,
+	0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65,
+	0x63, 0x74, 0x12, 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18,
+	0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
 	0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
 	0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
 	0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
-	0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64, 0x67, 0x65, 0x12,
-	0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
-	0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61,
-	0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65,
-	0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e,
-	0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74,
-	0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x12,
-	0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01,
-	0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
-	0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
-	0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
-	0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73,
-	0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x09, 0x72,
-	0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68,
-	0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x62, 0x65,
-	0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b,
-	0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x75, 0x6e, 0x74,
-	0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31,
-	0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+	0x52, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67,
+	0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+	0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f,
+	0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72,
+	0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31,
+	0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
index ef84c20887a..5046994707b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
@@ -114,6 +114,8 @@ message Type {
 
         BUNDLEFINALIZATION = 23;
 
+        STATEPROVIDER = 24;
+
         T = 15;
         U = 16;
         V = 17;
diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go
new file mode 100644
index 00000000000..62fd4509936
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package state contains structs for reading and manipulating pipeline state.
+package state
+
+import (
+	"reflect"
+)
+
+type TransactionType_Enum int32
+
+const (
+	TransactionType_Set   TransactionType_Enum = 0
+	TransactionType_Clear TransactionType_Enum = 1
+)
+
+var (
+	ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+// TODO(#20510) - add other forms of state (MapState, BagState, CombiningState), prefetch, and clear.
+
+// Transaction is used to represent a pending state transaction. This should not be manipulated directly;
+// it is primarily used for implementations of the Provider interface to talk to the various State objects.
+type Transaction struct {
+	Key  string
+	Type TransactionType_Enum
+	Val  interface{}
+}
+
+// Provider represents the DoFn parameter used to get and manipulate pipeline state
+// stored as key value pairs (https://beam.apache.org/documentation/programming-guide/#state-and-timers).
+// This should not be manipulated directly. Instead it should be used as a parameter
+// to functions on State objects like state.Value.
+type Provider interface {
+	ReadValueState(id string) (interface{}, []Transaction, error)
+	WriteValueState(val Transaction) error
+}
+
+// PipelineState is an interface representing different kinds of PipelineState (currently just state.Value).
+// It is primarily meant for Beam packages to use and is probably not useful for most pipeline authors.
+type PipelineState interface {
+	StateKey() string
+	CoderType() reflect.Type
+}
+
+// Value is used to read and write global pipeline state representing a single value.
+// Key represents the key used to lookup this state.
+type Value[T any] struct {
+	Key string
+}
+
+// Write is used to write this instance of global pipeline state representing a single value.
+func (s *Value[T]) Write(p Provider, val T) error {
+	return p.WriteValueState(Transaction{
+		Key:  s.Key,
+		Type: TransactionType_Set,
+		Val:  val,
+	})
+}
+
+// Read is used to read this instance of global pipeline state representing a single value.
+// When a value is not found, returns the 0 value and false.
+func (s *Value[T]) Read(p Provider) (T, bool, error) {
+	// This replays any writes that have happened to this value since we last read
+	// For more detail, see "State Transactionality" below for buffered transactions
+	cur, bufferedTransactions, err := p.ReadValueState(s.Key)
+	if err != nil {
+		var val T
+		return val, false, err
+	}
+	for _, t := range bufferedTransactions {
+		switch t.Type {
+		case TransactionType_Set:
+			cur = t.Val
+		case TransactionType_Clear:
+			cur = nil
+		}
+	}
+	if cur == nil {
+		var val T
+		return val, false, nil
+	}
+	return cur.(T), true, nil
+}
+
+// StateKey returns the key for this pipeline state entry.
+func (s Value[T]) StateKey() string {
+	if s.Key == "" {
+		// TODO(#22736) - infer the state from the member variable name during pipeline construction.
+		panic("Value state exists on struct but has not been initialized with a key.")
+	}
+	return s.Key
+}
+
+// CoderType returns the type of the value state which should be used for a coder.
+func (s Value[T]) CoderType() reflect.Type {
+	var t T
+	return reflect.TypeOf(t)
+}
+
+// MakeValueState is a factory function to create an instance of ValueState with the given key.
+func MakeValueState[T any](k string) Value[T] {
+	return Value[T]{
+		Key: k,
+	}
+}
diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go
new file mode 100644
index 00000000000..b1297891c1c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state_test.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package state
+
+import (
+	"errors"
+	"testing"
+)
+
+var (
+	errFake = errors.New("fake error")
+)
+
+type fakeProvider struct {
+	initialState map[string]interface{}
+	transactions map[string][]Transaction
+	err          map[string]error
+}
+
+func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) {
+	if err, ok := s.err[userStateId]; ok {
+		return nil, nil, err
+	}
+	base := s.initialState[userStateId]
+	trans, ok := s.transactions[userStateId]
+	if !ok {
+		trans = []Transaction{}
+	}
+	return base, trans, nil
+}
+
+func (s *fakeProvider) WriteValueState(val Transaction) error {
+	if transactions, ok := s.transactions[val.Key]; ok {
+		s.transactions[val.Key] = append(transactions, val)
+	} else {
+		s.transactions[val.Key] = []Transaction{val}
+	}
+	return nil
+}
+
+func TestValueRead(t *testing.T) {
+	is := make(map[string]interface{})
+	ts := make(map[string][]Transaction)
+	es := make(map[string]error)
+	is["no_transactions"] = 1
+	ts["no_transactions"] = nil
+	is["basic_set"] = 1
+	ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}}
+	is["basic_clear"] = 1
+	ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}}
+	is["set_then_clear"] = 1
+	ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}}
+	is["set_then_clear_then_set"] = 1
+	ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
+	is["err"] = 1
+	ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}}
+	es["err"] = errFake
+
+	f := fakeProvider{
+		initialState: is,
+		transactions: ts,
+		err:          es,
+	}
+
+	var tests = []struct {
+		vs  Value[int]
+		val int
+		ok  bool
+		err error
+	}{
+		{MakeValueState[int]("no_transactions"), 1, true, nil},
+		{MakeValueState[int]("basic_set"), 3, true, nil},
+		{MakeValueState[int]("basic_clear"), 0, false, nil},
+		{MakeValueState[int]("set_then_clear"), 0, false, nil},
+		{MakeValueState[int]("set_then_clear_then_set"), 4, true, nil},
+		{MakeValueState[int]("err"), 0, false, errFake},
+	}
+
+	for _, tt := range tests {
+		val, ok, err := tt.vs.Read(&f)
+		if err != nil && tt.err == nil {
+			t.Errorf("Value.Read() returned error %v for state key %v when it shouldn't have", err, tt.vs.Key)
+		} else if err == nil && tt.err != nil {
+			t.Errorf("Value.Read() returned no error for state key %v when it should have returned %v", tt.vs.Key, err)
+		} else if ok && !tt.ok {
+			t.Errorf("Value.Read() returned a value %v for state key %v when it shouldn't have", val, tt.vs.Key)
+		} else if !ok && tt.ok {
+			t.Errorf("Value.Read() didn't return a value for state key %v when it should have returned %v", tt.vs.Key, tt.val)
+		} else if val != tt.val {
+			t.Errorf("Value.Read()=%v, want %v for state key %v", val, tt.val, tt.vs.Key)
+		}
+	}
+}
+
+func TestValueWrite(t *testing.T) {
+	var tests = []struct {
+		writes []int
+		val    int
+		ok     bool
+	}{
+		{[]int{}, 0, false},
+		{[]int{3}, 3, true},
+		{[]int{1, 5}, 5, true},
+	}
+
+	for _, tt := range tests {
+		f := fakeProvider{
+			initialState: make(map[string]interface{}),
+			transactions: make(map[string][]Transaction),
+			err:          make(map[string]error),
+		}
+		vs := MakeValueState[int]("vs")
+		for _, val := range tt.writes {
+			vs.Write(&f, val)
+		}
+		val, ok, err := vs.Read(&f)
+		if err != nil {
+			t.Errorf("Value.Read() returned error %v when it shouldn't have after writing: %v", err, tt.writes)
+		} else if ok && !tt.ok {
+			t.Errorf("Value.Read() returned a value %v when it shouldn't have after writing: %v", val, tt.writes)
+		} else if !ok && tt.ok {
+			t.Errorf("Value.Read() didn't return a value when it should have returned %v after writing: %v", tt.val, tt.writes)
+		} else if val != tt.val {
+			t.Errorf("Value.Read()=%v, want %v after writing: %v", val, tt.val, tt.writes)
+		}
+	}
+}
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index 66fca00838d..dcdb3e74d1e 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -92,6 +92,19 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 		return nil, addParDoCtx(err, s)
 	}
 
+	pipelineState := fn.PipelineState()
+	if len(pipelineState) > 0 {
+		edge.StateCoders = make(map[string]*coder.Coder)
+		for _, ps := range pipelineState {
+			sT := typex.New(ps.CoderType())
+			c, err := inferCoder(sT)
+			if err != nil {
+				return nil, addParDoCtx(err, s)
+			}
+			edge.StateCoders[ps.StateKey()] = c
+		}
+	}
+
 	var ret []PCollection
 	for _, out := range edge.Output {
 		c := PCollection{out.To}