You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2022/08/18 15:07:39 UTC
[beam] branch master updated: Go stateful DoFns user side changes (#22761)
This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 75eb0b1431c Go stateful DoFns user side changes (#22761)
75eb0b1431c is described below
commit 75eb0b1431c84c98f2e16a9f535b0e11b0160d43
Author: Danny McCormick <da...@google.com>
AuthorDate: Thu Aug 18 08:07:30 2022 -0700
Go stateful DoFns user side changes (#22761)
* Go stateful DoFns user side changes
* Fix static check violation
* Small cleanup
* Doc comments
---
sdks/go/pkg/beam/core/funcx/fn.go | 45 +++-
sdks/go/pkg/beam/core/funcx/fn_test.go | 16 ++
sdks/go/pkg/beam/core/graph/edge.go | 17 +-
sdks/go/pkg/beam/core/graph/fn.go | 75 +++++-
sdks/go/pkg/beam/core/graph/fn_test.go | 89 +++++--
sdks/go/pkg/beam/core/runtime/exec/fn.go | 32 ++-
sdks/go/pkg/beam/core/runtime/exec/translate.go | 16 ++
sdks/go/pkg/beam/core/runtime/graphx/serialize.go | 5 +
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 24 ++
sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go | 300 +++++++++++-----------
sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto | 2 +
sdks/go/pkg/beam/core/state/state.go | 120 +++++++++
sdks/go/pkg/beam/core/state/state_test.go | 140 ++++++++++
sdks/go/pkg/beam/pardo.go | 13 +
14 files changed, 715 insertions(+), 179 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..788d6d52df6 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -20,6 +20,7 @@ import (
"reflect"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -82,6 +83,8 @@ const (
FnBundleFinalization FnParamKind = 0x800
// FnWatermarkEstimator indicates a function input parameter that implements sdf.WatermarkEstimator
FnWatermarkEstimator FnParamKind = 0x1000
+ // FnState indicates a function input parameter that implements state.Provider
+ FnStateProvider FnParamKind = 0x2000
)
func (k FnParamKind) String() string {
@@ -112,6 +115,8 @@ func (k FnParamKind) String() string {
return "BundleFinalization"
case FnWatermarkEstimator:
return "WatermarkEstimator"
+ case FnStateProvider:
+ return "StateProvider"
default:
return fmt.Sprintf("%v", int(k))
}
@@ -289,6 +294,17 @@ func (u *Fn) BundleFinalization() (pos int, exists bool) {
return -1, false
}
+// StateProvider returns (index, true) iff the function expects a
+// parameter that implements state.Provider.
+func (u *Fn) StateProvider() (pos int, exists bool) {
+ for i, p := range u.Param {
+ if p.Kind == FnStateProvider {
+ return i, true
+ }
+ }
+ return -1, false
+}
+
// WatermarkEstimator returns (index, true) iff the function expects a
// parameter that implements sdf.WatermarkEstimator.
func (u *Fn) WatermarkEstimator() (pos int, exists bool) {
@@ -374,6 +390,8 @@ func New(fn reflectx.Func) (*Fn, error) {
kind = FnWindow
case t == typex.BundleFinalizationType:
kind = FnBundleFinalization
+ case t == state.ProviderType:
+ kind = FnStateProvider
case t == reflectx.Type:
kind = FnType
case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
@@ -464,7 +482,7 @@ func SubReturns(list []ReturnParam, indices ...int) []ReturnParam {
}
// The order of present parameters and return values must be as follows:
-// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
// where ? indicates 0 or 1, and * indicates any number.
// and a SideInput is one of FnValue or FnIter or FnReIter
// Note: Fns with inputs must have at least one FnValue as the main input.
@@ -496,6 +514,7 @@ var (
errReflectTypePrecedence = errors.New("may only have a single reflect.Type parameter and it must precede the main input parameter")
errRTrackerPrecedence = errors.New("may only have a single sdf.RTracker parameter and it must precede the main input parameter")
errBundleFinalizationPrecedence = errors.New("may only have a single BundleFinalization parameter and it must precede the main input parameter")
+ errStateProviderPrecedence = errors.New("may only have a single state.Provider parameter and it must precede the main input parameter")
errInputPrecedence = errors.New("inputs parameters must precede emit function parameters")
)
@@ -513,6 +532,7 @@ const (
psOutput
psRTracker
psBundleFinalization
+ psStateProvider
)
func nextParamState(cur paramState, transition FnParamKind) (paramState, error) {
@@ -535,6 +555,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psContext:
switch transition {
@@ -552,6 +574,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psPane:
switch transition {
@@ -567,6 +591,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psWindow:
switch transition {
@@ -580,6 +606,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psEventTime:
switch transition {
@@ -591,6 +619,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psWatermarkEstimator:
switch transition {
@@ -600,6 +630,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psType:
switch transition {
@@ -607,13 +639,22 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return psBundleFinalization, nil
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psBundleFinalization:
switch transition {
case FnRTracker:
return psRTracker, nil
+ case FnStateProvider:
+ return psStateProvider, nil
}
case psRTracker:
+ switch transition {
+ case FnStateProvider:
+ return psStateProvider, nil
+ }
+ case psStateProvider:
// Completely handled by the default clause
case psInput:
switch transition {
@@ -644,6 +685,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
return -1, errBundleFinalizationPrecedence
case FnRTracker:
return -1, errRTrackerPrecedence
+ case FnStateProvider:
+ return -1, errStateProviderPrecedence
case FnIter, FnReIter, FnValue, FnMultiMap:
return psInput, nil
case FnEmit:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..9eee359c1e5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -108,6 +109,11 @@ func TestNew(t *testing.T) {
Fn: func(typex.PaneInfo, typex.Window, typex.EventTime, sdf.WatermarkEstimator, reflect.Type, []byte) {},
Param: []FnParamKind{FnPane, FnWindow, FnEventTime, FnWatermarkEstimator, FnType, FnValue},
},
+ {
+ Name: "good10",
+ Fn: func(sdf.RTracker, state.Provider, []byte) {},
+ Param: []FnParamKind{FnRTracker, FnStateProvider, FnValue},
+ },
{
Name: "good-method",
Fn: foo{1}.Do,
@@ -211,6 +217,16 @@ func TestNew(t *testing.T) {
Fn: func(int, func(int), func() func(*int) bool) {},
Err: errInputPrecedence,
},
+ {
+ Name: "errInputPrecedence- StateProvider before RTracker",
+ Fn: func(state.Provider, sdf.RTracker, int) {},
+ Err: errRTrackerPrecedence,
+ },
+ {
+ Name: "errInputPrecedence- StateProvider after output",
+ Fn: func(int, state.Provider) {},
+ Err: errStateProviderPrecedence,
+ },
{
Name: "errInputPrecedence- input after output",
Fn: func(int, func(int), int) {},
diff --git a/sdks/go/pkg/beam/core/graph/edge.go b/sdks/go/pkg/beam/core/graph/edge.go
index 5e18e7559ec..a9f1c8a092b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -153,14 +153,15 @@ type MultiEdge struct {
parent *Scope
Op Opcode
- DoFn *DoFn // ParDo
- RestrictionCoder *coder.Coder // SplittableParDo
- CombineFn *CombineFn // Combine
- AccumCoder *coder.Coder // Combine
- Value []byte // Impulse
- External *ExternalTransform // Current External Transforms API
- Payload *Payload // Legacy External Transforms API
- WindowFn *window.Fn // WindowInto
+ DoFn *DoFn // ParDo
+ RestrictionCoder *coder.Coder // SplittableParDo
+ StateCoders map[string]*coder.Coder // Stateful ParDo
+ CombineFn *CombineFn // Combine
+ AccumCoder *coder.Coder // Combine
+ Value []byte // Impulse
+ External *ExternalTransform // Current External Transforms API
+ Payload *Payload // Legacy External Transforms API
+ WindowFn *window.Fn // WindowInto
Input []*Inbound
Output []*Outbound
diff --git a/sdks/go/pkg/beam/core/graph/fn.go b/sdks/go/pkg/beam/core/graph/fn.go
index 6b3656c1c25..2034349cdf2 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -282,6 +283,27 @@ func (f *DoFn) IsSplittable() bool {
return ok
}
+// PipelineState returns a list of PipelineState objects used to access/mutate global pipeline state (if any).
+func (f *DoFn) PipelineState() []state.PipelineState {
+ var s []state.PipelineState
+ if f.Recv == nil {
+ return s
+ }
+
+ v := reflect.Indirect(reflect.ValueOf(f.Recv))
+
+ for i := 0; i < v.NumField(); i++ {
+ f := v.Field(i)
+ if f.CanInterface() {
+ if ps, ok := f.Interface().(state.PipelineState); ok {
+ s = append(s, ps)
+ }
+ }
+ }
+
+ return s
+}
+
// SplittableDoFn represents a DoFn implementing SDF methods.
type SplittableDoFn DoFn
@@ -561,7 +583,14 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) {
}
}
- return (*DoFn)(fn), nil
+ doFn := (*DoFn)(fn)
+
+ err = validateState(doFn, numMainIn)
+ if err != nil {
+ return nil, addContext(err, fn)
+ }
+
+ return doFn, nil
}
// validateMainInputs checks that a method has the given number of main inputs
@@ -1221,6 +1250,50 @@ func validateStatefulWatermarkSig(fn *Fn, numMainIn int) error {
return nil
}
+func validateState(fn *DoFn, numIn mainInputs) error {
+ ps := fn.PipelineState()
+
+ if _, hasSp := fn.methods[processElementName].StateProvider(); hasSp {
+ if numIn == MainSingle {
+ err := errors.Errorf("ProcessElement uses a StateProvider, but is not keyed")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but is not keyed. "+
+ "All stateful DoFns must take a key/value pair as an input.")
+ }
+ if len(ps) == 0 {
+ err := errors.Errorf("ProcessElement uses a StateProvider, but noState structs are attached to the DoFn")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but no State structs are "+
+ "attached to the DoFn. Ensure that you are including the State structs you're using to read/write"+
+ "global state as public uppercase member variables.")
+ }
+ stateKeys := make(map[string]state.PipelineState)
+ for _, s := range ps {
+ k := s.StateKey()
+ if orig, ok := stateKeys[k]; ok {
+ err := errors.Errorf("Duplicate state key %v", k)
+ return errors.SetTopLevelMsgf(err, "Duplicate state key %v used by %v and %v. Ensure that state keys are"+
+ "unique per DoFn", k, orig, s)
+ } else {
+ stateKeys[k] = s
+ }
+ }
+
+ // TODO(#22736) - Remove this once state is fully supported
+ err := errors.Errorf("ProcessElement uses a StateProvider, but state is not supported in this release.")
+ return errors.SetTopLevelMsgf(err, "ProcessElement uses a StateProvider, but state is not supported in this release. "+
+ "Please try upgrading to a newer release if one exists or wait for state support to be released.")
+ } else {
+ if len(ps) > 0 {
+ err := errors.Errorf("ProcessElement doesn't use a StateProvider, but State structs are attached to "+
+ "the DoFn: %v", ps)
+ return errors.SetTopLevelMsgf(err, "ProcessElement doesn't use a StateProvider, but State structs are "+
+ "attached to the DoFn: %v\nEnsure that you are using the StateProvider to perform any reads or writes"+
+ "of pipeline state.", ps)
+ }
+ }
+
+ return nil
+}
+
// CombineFn represents a CombineFn.
type CombineFn Fn
diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go b/sdks/go/pkg/beam/core/graph/fn_test.go
index a1702175f64..2117c735dce 100644
--- a/sdks/go/pkg/beam/core/graph/fn_test.go
+++ b/sdks/go/pkg/beam/core/graph/fn_test.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -52,6 +53,8 @@ func TestNewDoFn(t *testing.T) {
{dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)},
{dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)},
{dfn: &GoodDoFnCoGbk1wSide{}, opt: NumMainInputs(MainKv)},
+ // TODO(#22736) - Enable this once stateful dofns are fully supported
+ // {dfn: &GoodStatefulDoFn{State1: state.Value[int](state.MakeValueState[int]("state1"))}, opt: NumMainInputs(MainKv)},
}
for _, test := range tests {
@@ -70,7 +73,8 @@ func TestNewDoFn(t *testing.T) {
})
t.Run("invalid", func(t *testing.T) {
tests := []struct {
- dfn interface{}
+ dfn interface{}
+ numInputs int
}{
// Validate main inputs.
{dfn: func() int { return 0 }}, // No inputs.
@@ -94,26 +98,44 @@ func TestNewDoFn(t *testing.T) {
{dfn: &BadDoFnReturnValuesInFinishBundle{}},
{dfn: &BadDoFnReturnValuesInSetup{}},
{dfn: &BadDoFnReturnValuesInTeardown{}},
+ {dfn: &BadStatefulDoFnNoStateProvider{State1: state.Value[int](state.MakeValueState[int]("state1"))}},
+ {dfn: &BadStatefulDoFnNoStateFields{}},
+ {dfn: &BadStatefulDoFnNoKV{State1: state.Value[int](state.MakeValueState[int]("state1"))}, numInputs: 1},
}
for _, test := range tests {
t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) {
- if cfn, err := NewDoFn(test.dfn); err != nil {
- t.Logf("NewDoFn failed as expected:\n%v", err)
- } else {
- t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn)
- }
- // If validation fails with unknown main inputs, then it should
- // always fail for any known number of main inputs, so test them
- // all. Error messages won't necessarily match.
- if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
- t.Logf("NewDoFn failed as expected:\n%v", err)
- } else {
- t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
- }
- if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
- t.Logf("NewDoFn failed as expected:\n%v", err)
- } else {
- t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+ switch test.numInputs {
+ case 1:
+ if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
+ t.Logf("NewDoFn failed as expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+ }
+ case 2:
+ if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
+ t.Logf("NewDoFn failed as expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+ }
+ default:
+ if cfn, err := NewDoFn(test.dfn); err != nil {
+ t.Logf("NewDoFn failed as expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn)
+ }
+ // If validation fails with unknown main inputs, then it should
+ // always fail for any known number of main inputs, so test them
+ // all. Error messages won't necessarily match.
+ if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
+ t.Logf("NewDoFn failed as expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+ }
+ if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {
+ t.Logf("NewDoFn failed as expected:\n%v", err)
+ } else {
+ t.Errorf("NewDoFn(%v, NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+ }
}
})
}
@@ -1058,6 +1080,14 @@ func (fn *GoodStatefulWatermarkEstimatingKv) WatermarkEstimatorState(estimator *
return 0
}
+type GoodStatefulDoFn struct {
+ State1 state.Value[int]
+}
+
+func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, int, int) int {
+ return 0
+}
+
// Examples of incorrect SDF signatures.
// Examples with missing methods.
@@ -1422,6 +1452,29 @@ func (fn *BadManualWatermarkEstimatorMismatched) CreateWatermarkEstimator() *Wat
return &WatermarkEstimatorT{}
}
+type BadStatefulDoFnNoStateProvider struct {
+ State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoStateProvider) ProcessElement(int, int) int {
+ return 0
+}
+
+type BadStatefulDoFnNoStateFields struct {
+}
+
+func (fn *BadStatefulDoFnNoStateFields) ProcessElement(state.Provider, int) int {
+ return 0
+}
+
+type BadStatefulDoFnNoKV struct {
+ State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoKV) ProcessElement(state.Provider, int, int) int {
+ return 0
+}
+
// Examples of correct CombineFn signatures
type MyAccum struct{}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 48387f78c34..51f3296ca44 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
)
@@ -68,6 +69,21 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
}
}
+type stateProvider struct {
+}
+
+// ReadValueState reads a value state from the State API
+func (s *stateProvider) ReadValueState(userStateId string) (interface{}, []state.Transaction, error) {
+ // TODO(#22736) - read from the state api.
+ return nil, nil, errors.New("Stateful DoFns are not supported yet.")
+}
+
+// WriteValueState writes a value state to the State API
+func (s *stateProvider) WriteValueState(val state.Transaction) error {
+ // TODO(#22736) - read from the state api.
+ return errors.New("Stateful DoFns are not supported yet.")
+}
+
// Invoke invokes the fn with the given values. The extra values must match the non-main
// side input and emitters. It returns the direct output, if any.
func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we sdf.WatermarkEstimator, extra ...interface{}) (*FullValue, error) {
@@ -92,10 +108,11 @@ func InvokeWithoutEventTime(ctx context.Context, fn *funcx.Fn, opt *MainInput, b
type invoker struct {
fn *funcx.Fn
args []interface{}
+ sp *stateProvider
// TODO(lostluck): 2018/07/06 consider replacing with a slice of functions to run over the args slice, as an improvement.
- ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx int // specialized input indexes
- outEtIdx, outPcIdx, outErrIdx int // specialized output indexes
- in, out []int // general indexes
+ ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int // specialized input indexes
+ outEtIdx, outPcIdx, outErrIdx int // specialized output indexes
+ in, out []int // general indexes
ret FullValue // ret is a cached allocation for passing to the next Unit. Units never modify the passed in FullValue.
elmConvert, elm2Convert func(interface{}) interface{} // Cached conversion functions, which assums this invoker is always used with the same parameter types.
@@ -125,6 +142,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
if n.weIdx, ok = fn.WatermarkEstimator(); !ok {
n.weIdx = -1
}
+ if n.spIdx, ok = fn.StateProvider(); !ok {
+ n.spIdx = -1
+ }
if n.outEtIdx, ok = fn.OutEventTime(); !ok {
n.outEtIdx = -1
}
@@ -188,6 +208,12 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
args[n.weIdx] = we
}
+ if n.spIdx >= 0 {
+ // TODO(#22736) - provide this with the variable access it needs to talk to the state api.
+ n.sp = &stateProvider{}
+ args[n.spIdx] = n.sp
+ }
+
// (2) Main input from value, if any.
i := 0
if opt != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index e5b9976cb34..53ddf7843c2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -404,6 +404,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
urnTruncateSizedRestrictions:
var data string
var sides map[string]*pipepb.SideInput
+ var userState map[string]*pipepb.StateSpec
switch urn {
case graphx.URNParDo,
urnPairWithRestriction,
@@ -416,6 +417,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
}
data = string(pardo.GetDoFn().GetPayload())
sides = pardo.GetSideInputs()
+ userState = pardo.GetStateSpecs()
case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
var cmb pipepb.CombinePayload
if err := proto.Unmarshal(payload, &cmb); err != nil {
@@ -462,6 +464,20 @@ func (b *builder) makeLink(from string, id linkID) (Node, error) {
n.PID = transform.GetUniqueName()
input := unmarshalKeyedValues(transform.GetInputs())
+
+ if len(userState) > 0 {
+ stateIdToCoder := make(map[string]*coder.Coder)
+ for key, spec := range userState {
+ // TODO(#22736) - this will eventually need to be aware of which type of state its modifying to support non-Value state types.
+ cID := spec.GetReadModifyWriteSpec().CoderId
+ c, err := b.coders.Coder(cID)
+ if err != nil {
+ return nil, err
+ }
+ stateIdToCoder[key] = c
+ }
+ }
+
for i := 1; i < len(input); i++ {
// TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 75ee58484fd..4fe1a9e1616 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -27,6 +27,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
@@ -517,6 +518,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, bool) {
return v1pb.Type_WINDOW, true
case typex.BundleFinalizationType:
return v1pb.Type_BUNDLEFINALIZATION, true
+ case state.ProviderType:
+ return v1pb.Type_STATEPROVIDER, true
case typex.KVType:
return v1pb.Type_KV, true
case typex.CoGBKType:
@@ -681,6 +684,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, error) {
return typex.WindowType, nil
case v1pb.Type_BUNDLEFINALIZATION:
return typex.BundleFinalizationType, nil
+ case v1pb.Type_STATEPROVIDER:
+ return state.ProviderType, nil
case v1pb.Type_KV:
return typex.KVType, nil
case v1pb.Type_COGBK:
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 77797d756f1..653dacce7a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -70,6 +70,7 @@ const (
URNRequiresSplittableDoFn = "beam:requirement:pardo:splittable_dofn:v1"
URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+ URNRequiresStatefulProcessing = "beam:requirement:pardo:stateful:v1"
URNTruncate = "beam:transform:sdf_truncate_sized_restrictions:v1"
// Deprecated: Determine worker binary based on GoWorkerBinary Role instead.
@@ -457,6 +458,29 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) ([]string, error) {
if _, ok := edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
m.requirements[URNRequiresBundleFinalization] = true
}
+ if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); ok {
+ m.requirements[URNRequiresStatefulProcessing] = true
+ stateSpecs := make(map[string]*pipepb.StateSpec)
+ for _, ps := range edge.Edge.DoFn.PipelineState() {
+ coderId, err := m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
+ if err != nil {
+ return handleErr(err)
+ }
+ stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
+ // TODO (#22736) - make spec type and protocol conditional on type of State. Right now, assumes ValueState.
+ // See https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
+ Spec: &pipepb.StateSpec_ReadModifyWriteSpec{
+ ReadModifyWriteSpec: &pipepb.ReadModifyWriteStateSpec{
+ CoderId: coderId,
+ },
+ },
+ Protocol: &pipepb.FunctionSpec{
+ Urn: "beam:user_state:bag:v1",
+ },
+ }
+ }
+ payload.StateSpecs = stateSpecs
+ }
spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: protox.MustEncode(payload)}
annotations = edge.Edge.DoFn.Annotations()
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
index 413aa0f4d8c..e2e1e78c061 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
@@ -24,7 +24,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
-// protoc v3.19.4
+// protoc v3.14.0
// source: go/pkg/beam/core/runtime/graphx/v1/v1.proto
package v1
@@ -218,6 +218,7 @@ const (
Type_COGBK Type_Special = 13
Type_WINDOWEDVALUE Type_Special = 14
Type_BUNDLEFINALIZATION Type_Special = 23
+ Type_STATEPROVIDER Type_Special = 24
Type_T Type_Special = 15
Type_U Type_Special = 16
Type_V Type_Special = 17
@@ -240,6 +241,7 @@ var (
13: "COGBK",
14: "WINDOWEDVALUE",
23: "BUNDLEFINALIZATION",
+ 24: "STATEPROVIDER",
15: "T",
16: "U",
17: "V",
@@ -259,6 +261,7 @@ var (
"COGBK": 13,
"WINDOWEDVALUE": 14,
"BUNDLEFINALIZATION": 23,
+ "STATEPROVIDER": 24,
"T": 15,
"U": 16,
"V": 17,
@@ -1372,7 +1375,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73,
0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e,
0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61,
- 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xcb, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12,
+ 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xde, 0x0b, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12,
0x56, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x42, 0x2e,
0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e,
0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d,
@@ -1453,7 +1456,7 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
0x52, 0x4e, 0x41, 0x4c, 0x10, 0x1a, 0x22, 0x27, 0x0a, 0x07, 0x43, 0x68, 0x61, 0x6e, 0x44, 0x69,
0x72, 0x12, 0x08, 0x0a, 0x04, 0x52, 0x45, 0x43, 0x56, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x53,
0x45, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x42, 0x4f, 0x54, 0x48, 0x10, 0x02, 0x22,
- 0xc2, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49,
+ 0xd5, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 0x0b, 0x0a, 0x07, 0x49,
0x4c, 0x4c, 0x45, 0x47, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f,
0x52, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x54, 0x45, 0x58, 0x54, 0x10, 0x02,
0x12, 0x08, 0x0a, 0x04, 0x54, 0x59, 0x50, 0x45, 0x10, 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x45, 0x56,
@@ -1462,175 +1465,176 @@ var file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
0x05, 0x43, 0x4f, 0x47, 0x42, 0x4b, 0x10, 0x0d, 0x12, 0x11, 0x0a, 0x0d, 0x57, 0x49, 0x4e, 0x44,
0x4f, 0x57, 0x45, 0x44, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x10, 0x0e, 0x12, 0x16, 0x0a, 0x12, 0x42,
0x55, 0x4e, 0x44, 0x4c, 0x45, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 0x41, 0x54, 0x49, 0x4f,
- 0x4e, 0x10, 0x17, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a, 0x01, 0x55, 0x10,
- 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57, 0x10, 0x12, 0x12,
- 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14, 0x12, 0x05, 0x0a,
- 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70,
- 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61,
- 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65,
- 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e,
- 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04,
- 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e,
- 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
+ 0x4e, 0x10, 0x17, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 0x50, 0x52, 0x4f, 0x56,
+ 0x49, 0x44, 0x45, 0x52, 0x10, 0x18, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 0x0a,
+ 0x01, 0x55, 0x10, 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 0x57,
+ 0x10, 0x12, 0x12, 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 0x14,
+ 0x12, 0x05, 0x0a, 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 0x6c,
+ 0x54, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
+ 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
+ 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
+ 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70,
+ 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x6f,
+ 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72,
+ 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64,
+ 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63,
+ 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70,
+ 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a,
+ 0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73,
+ 0x65, 0x72, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
+ 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
+ 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
+ 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
+ 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05,
+ 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70,
+ 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70,
+ 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67,
+ 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e,
+ 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76,
+ 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04,
+ 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
+ 0x12, 0x10, 0x0a, 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67,
+ 0x65, 0x6e, 0x22, 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+ 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
+ 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
+ 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
+ 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79,
+ 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e,
0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65,
0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e,
- 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x63, 0x6f, 0x6d,
- 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 0x73, 0x65, 0x72, 0x46,
- 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20,
- 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
- 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
- 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
- 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79,
- 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 0x05, 0x44, 0x79, 0x6e,
- 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+ 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a,
+ 0x03, 0x6f, 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12,
+ 0x54, 0x0a, 0x05, 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e,
+ 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d,
+ 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61,
+ 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67,
+ 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05,
+ 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46,
+ 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
+ 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73,
+ 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b,
+ 0x0a, 0x09, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28,
+ 0x03, 0x52, 0x08, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67,
+ 0x61, 0x70, 0x5f, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70,
+ 0x4d, 0x73, 0x22, 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64,
+ 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70,
0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e,
0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54,
- 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74,
- 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x10, 0x0a,
- 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x67, 0x65, 0x6e, 0x22,
- 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
- 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
- 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
- 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65,
- 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+ 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63,
+ 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
+ 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
+ 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
+ 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
+ 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03,
+ 0x64, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e,
+ 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73,
+ 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72,
+ 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78,
+ 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22,
+ 0xba, 0x06, 0x0a, 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a,
+ 0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e,
+ 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73,
+ 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72,
+ 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78,
+ 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70,
+ 0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f,
+ 0x64, 0x65, 0x12, 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18,
+ 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
- 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6f, 0x70,
- 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 0x12, 0x54, 0x0a, 0x05,
- 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3e, 0x2e, 0x6f, 0x72,
+ 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77,
+ 0x46, 0x6e, 0x12, 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20,
+ 0x03, 0x28, 0x0b, 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
+ 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
+ 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75,
+ 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52,
+ 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62,
+ 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67,
+ 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b,
+ 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f,
+ 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68,
+ 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f,
+ 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e,
+ 0x64, 0x1a, 0xb5, 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a,
+ 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72,
0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64,
0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63,
0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70,
- 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 0x05, 0x64, 0x79, 0x6e,
- 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12, 0x12,
- 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69,
- 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 0x73, 0x18, 0x02, 0x20,
- 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x70,
- 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08,
- 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 0x67, 0x61, 0x70, 0x5f,
- 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 0x70, 0x4d, 0x73, 0x22,
- 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x12,
- 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
- 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
- 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62,
- 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e,
- 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d,
- 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65,
- 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 0x63, 0x18, 0x03, 0x20,
- 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e,
+ 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e,
+ 0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18,
+ 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+ 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
+ 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
+ 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
+ 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69,
+ 0x0a, 0x09, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49,
+ 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e,
+ 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10,
+ 0x02, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03,
+ 0x4d, 0x41, 0x50, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41,
+ 0x50, 0x10, 0x05, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a,
+ 0x06, 0x52, 0x45, 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74,
+ 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
+ 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b,
0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74,
- 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73,
- 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 0x03, 0x64, 0x65, 0x63,
- 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
- 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
- 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
- 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
- 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 0x22, 0xba, 0x06, 0x0a,
- 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 0x0a, 0x02, 0x66, 0x6e,
- 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61,
- 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f,
- 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72,
- 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31,
- 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64,
- 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 0x6f, 0x64, 0x65, 0x12,
- 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 0x18, 0x05, 0x20, 0x01,
- 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
- 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
- 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
- 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x69, 0x6e,
- 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x12,
- 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
- 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65,
- 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62,
- 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65,
- 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69,
- 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x07, 0x69, 0x6e,
- 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e,
- 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70,
- 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67,
- 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e,
- 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76,
- 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x4f, 0x75, 0x74, 0x62,
- 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x1a, 0xb5,
- 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 0x0a, 0x04, 0x6b, 0x69,
- 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
+ 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75,
+ 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d,
+ 0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a,
+ 0x01, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10,
+ 0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x12, 0x19, 0x0a, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
+ 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e,
+ 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02,
+ 0x20, 0x03, 0x28, 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68,
+ 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70,
+ 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e,
+ 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52,
+ 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e,
+ 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74,
+ 0x72, 0x79, 0x52, 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x73, 0x1a, 0x40, 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61,
+ 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c,
+ 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a,
+ 0x02, 0x38, 0x01, 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72,
+ 0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18,
+ 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64,
+ 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61,
0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e,
0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65,
0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e,
- 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62,
- 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x52, 0x04,
- 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
+ 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64,
+ 0x67, 0x65, 0x12, 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01,
+ 0x28, 0x0b, 0x32, 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
- 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c,
- 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x69, 0x0a, 0x09, 0x49,
- 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41,
- 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 0x4e, 0x10, 0x01, 0x12,
- 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 0x10, 0x02, 0x12, 0x09,
- 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 0x03, 0x4d, 0x41, 0x50,
- 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 0x41, 0x50, 0x10, 0x05,
- 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 0x0a, 0x06, 0x52, 0x45,
- 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75,
- 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b,
- 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65,
- 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62,
- 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65,
- 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54,
- 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 0x0d, 0x49, 0x6e, 0x6a,
- 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 0x0a, 0x01, 0x6e, 0x18,
- 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x73,
- 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x19, 0x0a,
- 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
- 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 0x0e, 0x63, 0x6f, 0x64,
- 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
- 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62,
- 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e,
- 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d,
- 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x68,
- 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x2e, 0x43, 0x6f, 0x64,
- 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52,
- 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x1a, 0x40,
- 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45,
- 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
- 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
- 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x50, 0x61,
- 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 0x18, 0x01, 0x20, 0x01,
- 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 0x64, 0x67, 0x65, 0x18,
- 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
+ 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a,
+ 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65,
+ 0x63, 0x74, 0x12, 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18,
+ 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63,
0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e,
0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75,
0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e,
- 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 0x64, 0x67, 0x65, 0x12,
- 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
- 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61,
- 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65,
- 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e,
- 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74,
- 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x12,
- 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x18, 0x04, 0x20, 0x01,
- 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e,
- 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67,
- 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69,
- 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73,
- 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x09, 0x72,
- 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67, 0x69, 0x74, 0x68,
- 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2f, 0x62, 0x65,
- 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x6b,
- 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72, 0x75, 0x6e, 0x74,
- 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31, 0x3b, 0x76, 0x31,
- 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x52, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 0x67,
+ 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65,
+ 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 0x6f,
+ 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x72,
+ 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 0x31,
+ 0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
index ef84c20887a..5046994707b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
@@ -114,6 +114,8 @@ message Type {
BUNDLEFINALIZATION = 23;
+ STATEPROVIDER = 24;
+
T = 15;
U = 16;
V = 17;
diff --git a/sdks/go/pkg/beam/core/state/state.go b/sdks/go/pkg/beam/core/state/state.go
new file mode 100644
index 00000000000..62fd4509936
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package state contains structs for reading and manipulating pipeline state.
+package state
+
+import (
+ "reflect"
+)
+
+type TransactionType_Enum int32
+
+const (
+ TransactionType_Set TransactionType_Enum = 0
+ TransactionType_Clear TransactionType_Enum = 1
+)
+
+var (
+ ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+// TODO(#20510) - add other forms of state (MapState, BagState, CombiningState), prefetch, and clear.
+
+// Transaction is used to represent a pending state transaction. This should not be manipulated directly;
+// it is primarily used for implementations of the Provider interface to talk to the various State objects.
+type Transaction struct {
+ Key string
+ Type TransactionType_Enum
+ Val interface{}
+}
+
+// Provider represents the DoFn parameter used to get and manipulate pipeline state
+// stored as key value pairs (https://beam.apache.org/documentation/programming-guide/#state-and-timers).
+// This should not be manipulated directly. Instead it should be used as a parameter
+// to functions on State objects like state.Value.
+type Provider interface {
+ ReadValueState(id string) (interface{}, []Transaction, error)
+ WriteValueState(val Transaction) error
+}
+
+// PipelineState is an interface representing different kinds of PipelineState (currently just state.Value).
+// It is primarily meant for Beam packages to use and is probably not useful for most pipeline authors.
+type PipelineState interface {
+ StateKey() string
+ CoderType() reflect.Type
+}
+
+// Value is used to read and write global pipeline state representing a single value.
+// Key represents the key used to lookup this state.
+type Value[T any] struct {
+ Key string
+}
+
+// Write is used to write this instance of global pipeline state representing a single value.
+func (s *Value[T]) Write(p Provider, val T) error {
+ return p.WriteValueState(Transaction{
+ Key: s.Key,
+ Type: TransactionType_Set,
+ Val: val,
+ })
+}
+
+// Read is used to read this instance of global pipeline state representing a single value.
+// When a value is not found, returns the 0 value and false.
+func (s *Value[T]) Read(p Provider) (T, bool, error) {
+ // This replays any writes that have happened to this value since we last read
+ // For more detail, see "State Transactionality" below for buffered transactions
+ cur, bufferedTransactions, err := p.ReadValueState(s.Key)
+ if err != nil {
+ var val T
+ return val, false, err
+ }
+ for _, t := range bufferedTransactions {
+ switch t.Type {
+ case TransactionType_Set:
+ cur = t.Val
+ case TransactionType_Clear:
+ cur = nil
+ }
+ }
+ if cur == nil {
+ var val T
+ return val, false, nil
+ }
+ return cur.(T), true, nil
+}
+
+// StateKey returns the key for this pipeline state entry.
+func (s Value[T]) StateKey() string {
+ if s.Key == "" {
+ // TODO(#22736) - infer the state from the member variable name during pipeline construction.
+ panic("Value state exists on struct but has not been initialized with a key.")
+ }
+ return s.Key
+}
+
+// CoderType returns the type of the value state which should be used for a coder.
+func (s Value[T]) CoderType() reflect.Type {
+ var t T
+ return reflect.TypeOf(t)
+}
+
+// MakeValueState is a factory function to create an instance of ValueState with the given key.
+func MakeValueState[T any](k string) Value[T] {
+ return Value[T]{
+ Key: k,
+ }
+}
diff --git a/sdks/go/pkg/beam/core/state/state_test.go b/sdks/go/pkg/beam/core/state/state_test.go
new file mode 100644
index 00000000000..b1297891c1c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state_test.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package state
+
+import (
+ "errors"
+ "testing"
+)
+
+var (
+ errFake = errors.New("fake error")
+)
+
+type fakeProvider struct {
+ initialState map[string]interface{}
+ transactions map[string][]Transaction
+ err map[string]error
+}
+
+func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, []Transaction, error) {
+ if err, ok := s.err[userStateId]; ok {
+ return nil, nil, err
+ }
+ base := s.initialState[userStateId]
+ trans, ok := s.transactions[userStateId]
+ if !ok {
+ trans = []Transaction{}
+ }
+ return base, trans, nil
+}
+
+func (s *fakeProvider) WriteValueState(val Transaction) error {
+ if transactions, ok := s.transactions[val.Key]; ok {
+ s.transactions[val.Key] = append(transactions, val)
+ } else {
+ s.transactions[val.Key] = []Transaction{val}
+ }
+ return nil
+}
+
+func TestValueRead(t *testing.T) {
+ is := make(map[string]interface{})
+ ts := make(map[string][]Transaction)
+ es := make(map[string]error)
+ is["no_transactions"] = 1
+ ts["no_transactions"] = nil
+ is["basic_set"] = 1
+ ts["basic_set"] = []Transaction{{Key: "basic_set", Type: TransactionType_Set, Val: 3}}
+ is["basic_clear"] = 1
+ ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: TransactionType_Clear, Val: nil}}
+ is["set_then_clear"] = 1
+ ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: TransactionType_Clear, Val: nil}}
+ is["set_then_clear_then_set"] = 1
+ ts["set_then_clear_then_set"] = []Transaction{{Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: "set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: "set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
+ is["err"] = 1
+ ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 3}}
+ es["err"] = errFake
+
+ f := fakeProvider{
+ initialState: is,
+ transactions: ts,
+ err: es,
+ }
+
+ var tests = []struct {
+ vs Value[int]
+ val int
+ ok bool
+ err error
+ }{
+ {MakeValueState[int]("no_transactions"), 1, true, nil},
+ {MakeValueState[int]("basic_set"), 3, true, nil},
+ {MakeValueState[int]("basic_clear"), 0, false, nil},
+ {MakeValueState[int]("set_then_clear"), 0, false, nil},
+ {MakeValueState[int]("set_then_clear_then_set"), 4, true, nil},
+ {MakeValueState[int]("err"), 0, false, errFake},
+ }
+
+ for _, tt := range tests {
+ val, ok, err := tt.vs.Read(&f)
+ if err != nil && tt.err == nil {
+ t.Errorf("Value.Read() returned error %v for state key %v when it shouldn't have", err, tt.vs.Key)
+ } else if err == nil && tt.err != nil {
+ t.Errorf("Value.Read() returned no error for state key %v when it should have returned %v", tt.vs.Key, err)
+ } else if ok && !tt.ok {
+ t.Errorf("Value.Read() returned a value %v for state key %v when it shouldn't have", val, tt.vs.Key)
+ } else if !ok && tt.ok {
+ t.Errorf("Value.Read() didn't return a value for state key %v when it should have returned %v", tt.vs.Key, tt.val)
+ } else if val != tt.val {
+ t.Errorf("Value.Read()=%v, want %v for state key %v", val, tt.val, tt.vs.Key)
+ }
+ }
+}
+
+func TestValueWrite(t *testing.T) {
+ var tests = []struct {
+ writes []int
+ val int
+ ok bool
+ }{
+ {[]int{}, 0, false},
+ {[]int{3}, 3, true},
+ {[]int{1, 5}, 5, true},
+ }
+
+ for _, tt := range tests {
+ f := fakeProvider{
+ initialState: make(map[string]interface{}),
+ transactions: make(map[string][]Transaction),
+ err: make(map[string]error),
+ }
+ vs := MakeValueState[int]("vs")
+ for _, val := range tt.writes {
+ vs.Write(&f, val)
+ }
+ val, ok, err := vs.Read(&f)
+ if err != nil {
+ t.Errorf("Value.Read() returned error %v when it shouldn't have after writing: %v", err, tt.writes)
+ } else if ok && !tt.ok {
+ t.Errorf("Value.Read() returned a value %v when it shouldn't have after writing: %v", val, tt.writes)
+ } else if !ok && tt.ok {
+ t.Errorf("Value.Read() didn't return a value when it should have returned %v after writing: %v", tt.val, tt.writes)
+ } else if val != tt.val {
+ t.Errorf("Value.Read()=%v, want %v after writing: %v", val, tt.val, tt.writes)
+ }
+ }
+}
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index 66fca00838d..dcdb3e74d1e 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -92,6 +92,19 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
return nil, addParDoCtx(err, s)
}
+ pipelineState := fn.PipelineState()
+ if len(pipelineState) > 0 {
+ edge.StateCoders = make(map[string]*coder.Coder)
+ for _, ps := range pipelineState {
+ sT := typex.New(ps.CoderType())
+ c, err := inferCoder(sT)
+ if err != nil {
+ return nil, addParDoCtx(err, s)
+ }
+ edge.StateCoders[ps.StateKey()] = c
+ }
+ }
+
var ret []PCollection
for _, out := range edge.Output {
c := PCollection{out.To}