You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/07 16:54:49 UTC

[33/50] [abbrv] beam git commit: Add all portability protos to Go

http://git-wip-us.apache.org/repos/asf/beam/blob/42100456/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
----------------------------------------------------------------------
diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
new file mode 100644
index 0000000..9a31a57
--- /dev/null
+++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
@@ -0,0 +1,2729 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: beam_fn_api.proto
+
+/*
+Package fnexecution_v1 is a generated protocol buffer package.
+
+It is generated from these files:
+	beam_fn_api.proto
+	beam_provision_api.proto
+
+It has these top-level messages:
+	Target
+	RemoteGrpcPort
+	InstructionRequest
+	InstructionResponse
+	RegisterRequest
+	RegisterResponse
+	ProcessBundleDescriptor
+	ProcessBundleRequest
+	ProcessBundleResponse
+	ProcessBundleProgressRequest
+	Metrics
+	ProcessBundleProgressResponse
+	ProcessBundleSplitRequest
+	ElementCountRestriction
+	ElementCountSkipRestriction
+	PrimitiveTransformSplit
+	ProcessBundleSplitResponse
+	Elements
+	StateRequest
+	StateResponse
+	StateKey
+	StateGetRequest
+	StateGetResponse
+	StateAppendRequest
+	StateAppendResponse
+	StateClearRequest
+	StateClearResponse
+	LogEntry
+	LogControl
+	DockerContainer
+	GetProvisionInfoRequest
+	GetProvisionInfoResponse
+	ProvisionInfo
+	Resources
+*/
+package fnexecution_v1
+
+import proto "github.com/golang/protobuf/proto"
+import fmt "fmt"
+import math "math"
+import org_apache_beam_model_pipeline_v1 "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+import org_apache_beam_model_pipeline_v11 "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+import google_protobuf1 "github.com/golang/protobuf/ptypes/timestamp"
+
+import (
+	context "golang.org/x/net/context"
+	grpc "google.golang.org/grpc"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type LogEntry_Severity_Enum int32
+
+const (
+	LogEntry_Severity_UNSPECIFIED LogEntry_Severity_Enum = 0
+	// Trace level information, also the default log level unless
+	// another severity is specified.
+	LogEntry_Severity_TRACE LogEntry_Severity_Enum = 1
+	// Debugging information.
+	LogEntry_Severity_DEBUG LogEntry_Severity_Enum = 2
+	// Normal events.
+	LogEntry_Severity_INFO LogEntry_Severity_Enum = 3
+	// Normal but significant events, such as start up, shut down, or
+	// configuration.
+	LogEntry_Severity_NOTICE LogEntry_Severity_Enum = 4
+	// Warning events might cause problems.
+	LogEntry_Severity_WARN LogEntry_Severity_Enum = 5
+	// Error events are likely to cause problems.
+	LogEntry_Severity_ERROR LogEntry_Severity_Enum = 6
+	// Critical events cause severe problems or brief outages and may
+	// indicate that a person must take action.
+	LogEntry_Severity_CRITICAL LogEntry_Severity_Enum = 7
+)
+
+var LogEntry_Severity_Enum_name = map[int32]string{
+	0: "UNSPECIFIED",
+	1: "TRACE",
+	2: "DEBUG",
+	3: "INFO",
+	4: "NOTICE",
+	5: "WARN",
+	6: "ERROR",
+	7: "CRITICAL",
+}
+var LogEntry_Severity_Enum_value = map[string]int32{
+	"UNSPECIFIED": 0,
+	"TRACE":       1,
+	"DEBUG":       2,
+	"INFO":        3,
+	"NOTICE":      4,
+	"WARN":        5,
+	"ERROR":       6,
+	"CRITICAL":    7,
+}
+
+func (x LogEntry_Severity_Enum) String() string {
+	return proto.EnumName(LogEntry_Severity_Enum_name, int32(x))
+}
+func (LogEntry_Severity_Enum) EnumDescriptor() ([]byte, []int) {
+	return fileDescriptor0, []int{27, 1, 0}
+}
+
+// A representation of an input or output definition on a primitive transform.
+// Stable
+type Target struct {
+	// (Required) The id of the PrimitiveTransform which is the target.
+	PrimitiveTransformReference string `protobuf:"bytes,1,opt,name=primitive_transform_reference,json=primitiveTransformReference" json:"primitive_transform_reference,omitempty"`
+	// (Required) The local name of an input or output defined on the primitive
+	// transform.
+	Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
+}
+
+func (m *Target) Reset()                    { *m = Target{} }
+func (m *Target) String() string            { return proto.CompactTextString(m) }
+func (*Target) ProtoMessage()               {}
+func (*Target) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
+
+func (m *Target) GetPrimitiveTransformReference() string {
+	if m != nil {
+		return m.PrimitiveTransformReference
+	}
+	return ""
+}
+
+func (m *Target) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+// A repeated list of target definitions.
+type Target_List struct {
+	Target []*Target `protobuf:"bytes,1,rep,name=target" json:"target,omitempty"`
+}
+
+func (m *Target_List) Reset()                    { *m = Target_List{} }
+func (m *Target_List) String() string            { return proto.CompactTextString(m) }
+func (*Target_List) ProtoMessage()               {}
+func (*Target_List) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
+
+func (m *Target_List) GetTarget() []*Target {
+	if m != nil {
+		return m.Target
+	}
+	return nil
+}
+
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+type RemoteGrpcPort struct {
+	// (Required) An API descriptor which describes where to
+	// connect to including any authentication that is required.
+	ApiServiceDescriptor *org_apache_beam_model_pipeline_v11.ApiServiceDescriptor `protobuf:"bytes,1,opt,name=api_service_descriptor,json=apiServiceDescriptor" json:"api_service_descriptor,omitempty"`
+}
+
+func (m *RemoteGrpcPort) Reset()                    { *m = RemoteGrpcPort{} }
+func (m *RemoteGrpcPort) String() string            { return proto.CompactTextString(m) }
+func (*RemoteGrpcPort) ProtoMessage()               {}
+func (*RemoteGrpcPort) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
+
+func (m *RemoteGrpcPort) GetApiServiceDescriptor() *org_apache_beam_model_pipeline_v11.ApiServiceDescriptor {
+	if m != nil {
+		return m.ApiServiceDescriptor
+	}
+	return nil
+}
+
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
+// Stable
+type InstructionRequest struct {
+	// (Required) An unique identifier provided by the runner which represents
+	// this requests execution. The InstructionResponse MUST have the matching id.
+	InstructionId string `protobuf:"bytes,1,opt,name=instruction_id,json=instructionId" json:"instruction_id,omitempty"`
+	// (Required) A request that the SDK Harness needs to interpret.
+	//
+	// Types that are valid to be assigned to Request:
+	//	*InstructionRequest_Register
+	//	*InstructionRequest_ProcessBundle
+	//	*InstructionRequest_ProcessBundleProgress
+	//	*InstructionRequest_ProcessBundleSplit
+	Request isInstructionRequest_Request `protobuf_oneof:"request"`
+}
+
+func (m *InstructionRequest) Reset()                    { *m = InstructionRequest{} }
+func (m *InstructionRequest) String() string            { return proto.CompactTextString(m) }
+func (*InstructionRequest) ProtoMessage()               {}
+func (*InstructionRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+type isInstructionRequest_Request interface {
+	isInstructionRequest_Request()
+}
+
+type InstructionRequest_Register struct {
+	Register *RegisterRequest `protobuf:"bytes,1000,opt,name=register,oneof"`
+}
+type InstructionRequest_ProcessBundle struct {
+	ProcessBundle *ProcessBundleRequest `protobuf:"bytes,1001,opt,name=process_bundle,json=processBundle,oneof"`
+}
+type InstructionRequest_ProcessBundleProgress struct {
+	ProcessBundleProgress *ProcessBundleProgressRequest `protobuf:"bytes,1002,opt,name=process_bundle_progress,json=processBundleProgress,oneof"`
+}
+type InstructionRequest_ProcessBundleSplit struct {
+	ProcessBundleSplit *ProcessBundleSplitRequest `protobuf:"bytes,1003,opt,name=process_bundle_split,json=processBundleSplit,oneof"`
+}
+
+func (*InstructionRequest_Register) isInstructionRequest_Request()              {}
+func (*InstructionRequest_ProcessBundle) isInstructionRequest_Request()         {}
+func (*InstructionRequest_ProcessBundleProgress) isInstructionRequest_Request() {}
+func (*InstructionRequest_ProcessBundleSplit) isInstructionRequest_Request()    {}
+
+func (m *InstructionRequest) GetRequest() isInstructionRequest_Request {
+	if m != nil {
+		return m.Request
+	}
+	return nil
+}
+
+func (m *InstructionRequest) GetInstructionId() string {
+	if m != nil {
+		return m.InstructionId
+	}
+	return ""
+}
+
+func (m *InstructionRequest) GetRegister() *RegisterRequest {
+	if x, ok := m.GetRequest().(*InstructionRequest_Register); ok {
+		return x.Register
+	}
+	return nil
+}
+
+func (m *InstructionRequest) GetProcessBundle() *ProcessBundleRequest {
+	if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundle); ok {
+		return x.ProcessBundle
+	}
+	return nil
+}
+
+func (m *InstructionRequest) GetProcessBundleProgress() *ProcessBundleProgressRequest {
+	if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundleProgress); ok {
+		return x.ProcessBundleProgress
+	}
+	return nil
+}
+
+func (m *InstructionRequest) GetProcessBundleSplit() *ProcessBundleSplitRequest {
+	if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundleSplit); ok {
+		return x.ProcessBundleSplit
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*InstructionRequest) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _InstructionRequest_OneofMarshaler, _InstructionRequest_OneofUnmarshaler, _InstructionRequest_OneofSizer, []interface{}{
+		(*InstructionRequest_Register)(nil),
+		(*InstructionRequest_ProcessBundle)(nil),
+		(*InstructionRequest_ProcessBundleProgress)(nil),
+		(*InstructionRequest_ProcessBundleSplit)(nil),
+	}
+}
+
+func _InstructionRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*InstructionRequest)
+	// request
+	switch x := m.Request.(type) {
+	case *InstructionRequest_Register:
+		b.EncodeVarint(1000<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Register); err != nil {
+			return err
+		}
+	case *InstructionRequest_ProcessBundle:
+		b.EncodeVarint(1001<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundle); err != nil {
+			return err
+		}
+	case *InstructionRequest_ProcessBundleProgress:
+		b.EncodeVarint(1002<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundleProgress); err != nil {
+			return err
+		}
+	case *InstructionRequest_ProcessBundleSplit:
+		b.EncodeVarint(1003<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundleSplit); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("InstructionRequest.Request has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _InstructionRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*InstructionRequest)
+	switch tag {
+	case 1000: // request.register
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(RegisterRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &InstructionRequest_Register{msg}
+		return true, err
+	case 1001: // request.process_bundle
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &InstructionRequest_ProcessBundle{msg}
+		return true, err
+	case 1002: // request.process_bundle_progress
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleProgressRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &InstructionRequest_ProcessBundleProgress{msg}
+		return true, err
+	case 1003: // request.process_bundle_split
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleSplitRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &InstructionRequest_ProcessBundleSplit{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _InstructionRequest_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*InstructionRequest)
+	// request
+	switch x := m.Request.(type) {
+	case *InstructionRequest_Register:
+		s := proto.Size(x.Register)
+		n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionRequest_ProcessBundle:
+		s := proto.Size(x.ProcessBundle)
+		n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionRequest_ProcessBundleProgress:
+		s := proto.Size(x.ProcessBundleProgress)
+		n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionRequest_ProcessBundleSplit:
+		s := proto.Size(x.ProcessBundleSplit)
+		n += proto.SizeVarint(1003<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+type InstructionResponse struct {
+	// (Required) A reference provided by the runner which represents a requests
+	// execution. The InstructionResponse MUST have the matching id when
+	// responding to the runner.
+	InstructionId string `protobuf:"bytes,1,opt,name=instruction_id,json=instructionId" json:"instruction_id,omitempty"`
+	// If this is specified, then this instruction has failed.
+	// A human readable string representing the reason as to why processing has
+	// failed.
+	Error string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
+	// If the instruction did not fail, it is required to return an equivalent
+	// response type depending on the request this matches.
+	//
+	// Types that are valid to be assigned to Response:
+	//	*InstructionResponse_Register
+	//	*InstructionResponse_ProcessBundle
+	//	*InstructionResponse_ProcessBundleProgress
+	//	*InstructionResponse_ProcessBundleSplit
+	Response isInstructionResponse_Response `protobuf_oneof:"response"`
+}
+
+func (m *InstructionResponse) Reset()                    { *m = InstructionResponse{} }
+func (m *InstructionResponse) String() string            { return proto.CompactTextString(m) }
+func (*InstructionResponse) ProtoMessage()               {}
+func (*InstructionResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
+
+type isInstructionResponse_Response interface {
+	isInstructionResponse_Response()
+}
+
+type InstructionResponse_Register struct {
+	Register *RegisterResponse `protobuf:"bytes,1000,opt,name=register,oneof"`
+}
+type InstructionResponse_ProcessBundle struct {
+	ProcessBundle *ProcessBundleResponse `protobuf:"bytes,1001,opt,name=process_bundle,json=processBundle,oneof"`
+}
+type InstructionResponse_ProcessBundleProgress struct {
+	ProcessBundleProgress *ProcessBundleProgressResponse `protobuf:"bytes,1002,opt,name=process_bundle_progress,json=processBundleProgress,oneof"`
+}
+type InstructionResponse_ProcessBundleSplit struct {
+	ProcessBundleSplit *ProcessBundleSplitResponse `protobuf:"bytes,1003,opt,name=process_bundle_split,json=processBundleSplit,oneof"`
+}
+
+func (*InstructionResponse_Register) isInstructionResponse_Response()              {}
+func (*InstructionResponse_ProcessBundle) isInstructionResponse_Response()         {}
+func (*InstructionResponse_ProcessBundleProgress) isInstructionResponse_Response() {}
+func (*InstructionResponse_ProcessBundleSplit) isInstructionResponse_Response()    {}
+
+func (m *InstructionResponse) GetResponse() isInstructionResponse_Response {
+	if m != nil {
+		return m.Response
+	}
+	return nil
+}
+
+func (m *InstructionResponse) GetInstructionId() string {
+	if m != nil {
+		return m.InstructionId
+	}
+	return ""
+}
+
+func (m *InstructionResponse) GetError() string {
+	if m != nil {
+		return m.Error
+	}
+	return ""
+}
+
+func (m *InstructionResponse) GetRegister() *RegisterResponse {
+	if x, ok := m.GetResponse().(*InstructionResponse_Register); ok {
+		return x.Register
+	}
+	return nil
+}
+
+func (m *InstructionResponse) GetProcessBundle() *ProcessBundleResponse {
+	if x, ok := m.GetResponse().(*InstructionResponse_ProcessBundle); ok {
+		return x.ProcessBundle
+	}
+	return nil
+}
+
+func (m *InstructionResponse) GetProcessBundleProgress() *ProcessBundleProgressResponse {
+	if x, ok := m.GetResponse().(*InstructionResponse_ProcessBundleProgress); ok {
+		return x.ProcessBundleProgress
+	}
+	return nil
+}
+
+func (m *InstructionResponse) GetProcessBundleSplit() *ProcessBundleSplitResponse {
+	if x, ok := m.GetResponse().(*InstructionResponse_ProcessBundleSplit); ok {
+		return x.ProcessBundleSplit
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*InstructionResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _InstructionResponse_OneofMarshaler, _InstructionResponse_OneofUnmarshaler, _InstructionResponse_OneofSizer, []interface{}{
+		(*InstructionResponse_Register)(nil),
+		(*InstructionResponse_ProcessBundle)(nil),
+		(*InstructionResponse_ProcessBundleProgress)(nil),
+		(*InstructionResponse_ProcessBundleSplit)(nil),
+	}
+}
+
+func _InstructionResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*InstructionResponse)
+	// response
+	switch x := m.Response.(type) {
+	case *InstructionResponse_Register:
+		b.EncodeVarint(1000<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Register); err != nil {
+			return err
+		}
+	case *InstructionResponse_ProcessBundle:
+		b.EncodeVarint(1001<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundle); err != nil {
+			return err
+		}
+	case *InstructionResponse_ProcessBundleProgress:
+		b.EncodeVarint(1002<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundleProgress); err != nil {
+			return err
+		}
+	case *InstructionResponse_ProcessBundleSplit:
+		b.EncodeVarint(1003<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.ProcessBundleSplit); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("InstructionResponse.Response has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _InstructionResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*InstructionResponse)
+	switch tag {
+	case 1000: // response.register
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(RegisterResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &InstructionResponse_Register{msg}
+		return true, err
+	case 1001: // response.process_bundle
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &InstructionResponse_ProcessBundle{msg}
+		return true, err
+	case 1002: // response.process_bundle_progress
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleProgressResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &InstructionResponse_ProcessBundleProgress{msg}
+		return true, err
+	case 1003: // response.process_bundle_split
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(ProcessBundleSplitResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &InstructionResponse_ProcessBundleSplit{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _InstructionResponse_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*InstructionResponse)
+	// response
+	switch x := m.Response.(type) {
+	case *InstructionResponse_Register:
+		s := proto.Size(x.Register)
+		n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionResponse_ProcessBundle:
+		s := proto.Size(x.ProcessBundle)
+		n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionResponse_ProcessBundleProgress:
+		s := proto.Size(x.ProcessBundleProgress)
+		n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *InstructionResponse_ProcessBundleSplit:
+		s := proto.Size(x.ProcessBundleSplit)
+		n += proto.SizeVarint(1003<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+type RegisterRequest struct {
+	// (Optional) The set of descriptors used to process bundles.
+	ProcessBundleDescriptor []*ProcessBundleDescriptor `protobuf:"bytes,1,rep,name=process_bundle_descriptor,json=processBundleDescriptor" json:"process_bundle_descriptor,omitempty"`
+}
+
+func (m *RegisterRequest) Reset()                    { *m = RegisterRequest{} }
+func (m *RegisterRequest) String() string            { return proto.CompactTextString(m) }
+func (*RegisterRequest) ProtoMessage()               {}
+func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
+
+func (m *RegisterRequest) GetProcessBundleDescriptor() []*ProcessBundleDescriptor {
+	if m != nil {
+		return m.ProcessBundleDescriptor
+	}
+	return nil
+}
+
+// Stable
+type RegisterResponse struct {
+}
+
+func (m *RegisterResponse) Reset()                    { *m = RegisterResponse{} }
+func (m *RegisterResponse) String() string            { return proto.CompactTextString(m) }
+func (*RegisterResponse) ProtoMessage()               {}
+func (*RegisterResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
+
+// Definitions that should be used to construct the bundle processing graph.
+type ProcessBundleDescriptor struct {
+	// (Required) A pipeline level unique id which can be used as a reference to
+	// refer to this.
+	Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+	// (Required) A map from pipeline-scoped id to PTransform.
+	Transforms map[string]*org_apache_beam_model_pipeline_v1.PTransform `protobuf:"bytes,2,rep,name=transforms" json:"transforms,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	// (Required) A map from pipeline-scoped id to PCollection.
+	Pcollections map[string]*org_apache_beam_model_pipeline_v1.PCollection `protobuf:"bytes,3,rep,name=pcollections" json:"pcollections,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	// (Required) A map from pipeline-scoped id to WindowingStrategy.
+	WindowingStrategies map[string]*org_apache_beam_model_pipeline_v1.WindowingStrategy `protobuf:"bytes,4,rep,name=windowing_strategies,json=windowingStrategies" json:"windowing_strategies,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	// (Required) A map from pipeline-scoped id to Coder.
+	Coders map[string]*org_apache_beam_model_pipeline_v1.Coder `protobuf:"bytes,5,rep,name=coders" json:"coders,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	// (Required) A map from pipeline-scoped id to Environment.
+	Environments map[string]*org_apache_beam_model_pipeline_v1.Environment `protobuf:"bytes,6,rep,name=environments" json:"environments,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	// A descriptor describing the end point to use for State API
+	// calls. Required if the Runner intends to send remote references over the
+	// data plane or if any of the transforms rely on user state or side inputs.
+	StateApiServiceDescriptor *org_apache_beam_model_pipeline_v11.ApiServiceDescriptor `protobuf:"bytes,7,opt,name=state_api_service_descriptor,json=stateApiServiceDescriptor" json:"state_api_service_descriptor,omitempty"`
+}
+
+func (m *ProcessBundleDescriptor) Reset()                    { *m = ProcessBundleDescriptor{} }
+func (m *ProcessBundleDescriptor) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleDescriptor) ProtoMessage()               {}
+func (*ProcessBundleDescriptor) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
+
+func (m *ProcessBundleDescriptor) GetId() string {
+	if m != nil {
+		return m.Id
+	}
+	return ""
+}
+
+func (m *ProcessBundleDescriptor) GetTransforms() map[string]*org_apache_beam_model_pipeline_v1.PTransform {
+	if m != nil {
+		return m.Transforms
+	}
+	return nil
+}
+
+func (m *ProcessBundleDescriptor) GetPcollections() map[string]*org_apache_beam_model_pipeline_v1.PCollection {
+	if m != nil {
+		return m.Pcollections
+	}
+	return nil
+}
+
+func (m *ProcessBundleDescriptor) GetWindowingStrategies() map[string]*org_apache_beam_model_pipeline_v1.WindowingStrategy {
+	if m != nil {
+		return m.WindowingStrategies
+	}
+	return nil
+}
+
+func (m *ProcessBundleDescriptor) GetCoders() map[string]*org_apache_beam_model_pipeline_v1.Coder {
+	if m != nil {
+		return m.Coders
+	}
+	return nil
+}
+
+func (m *ProcessBundleDescriptor) GetEnvironments() map[string]*org_apache_beam_model_pipeline_v1.Environment {
+	if m != nil {
+		return m.Environments
+	}
+	return nil
+}
+
+func (m *ProcessBundleDescriptor) GetStateApiServiceDescriptor() *org_apache_beam_model_pipeline_v11.ApiServiceDescriptor {
+	if m != nil {
+		return m.StateApiServiceDescriptor
+	}
+	return nil
+}
+
+// A request to process a given bundle.
+// Stable
+type ProcessBundleRequest struct {
+	// (Required) A reference to the process bundle descriptor that must be
+	// instantiated and executed by the SDK harness.
+	ProcessBundleDescriptorReference string `protobuf:"bytes,1,opt,name=process_bundle_descriptor_reference,json=processBundleDescriptorReference" json:"process_bundle_descriptor_reference,omitempty"`
+	// (Optional) A list of cache tokens that can be used by an SDK to reuse
+	// cached data returned by the State API across multiple bundles.
+	CacheTokens [][]byte `protobuf:"bytes,2,rep,name=cache_tokens,json=cacheTokens,proto3" json:"cache_tokens,omitempty"`
+}
+
+func (m *ProcessBundleRequest) Reset()                    { *m = ProcessBundleRequest{} }
+func (m *ProcessBundleRequest) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleRequest) ProtoMessage()               {}
+func (*ProcessBundleRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
+
+func (m *ProcessBundleRequest) GetProcessBundleDescriptorReference() string {
+	if m != nil {
+		return m.ProcessBundleDescriptorReference
+	}
+	return ""
+}
+
+func (m *ProcessBundleRequest) GetCacheTokens() [][]byte {
+	if m != nil {
+		return m.CacheTokens
+	}
+	return nil
+}
+
+// Stable
+type ProcessBundleResponse struct {
+	// (Optional) If metrics reporting is supported by the SDK, this represents
+	// the final metrics to record for this bundle.
+	Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics" json:"metrics,omitempty"`
+}
+
+func (m *ProcessBundleResponse) Reset()                    { *m = ProcessBundleResponse{} }
+func (m *ProcessBundleResponse) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleResponse) ProtoMessage()               {}
+func (*ProcessBundleResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
+
+func (m *ProcessBundleResponse) GetMetrics() *Metrics {
+	if m != nil {
+		return m.Metrics
+	}
+	return nil
+}
+
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
+type ProcessBundleProgressRequest struct {
+	// (Required) A reference to an active process bundle request with the given
+	// instruction id.
+	InstructionReference string `protobuf:"bytes,1,opt,name=instruction_reference,json=instructionReference" json:"instruction_reference,omitempty"`
+}
+
+func (m *ProcessBundleProgressRequest) Reset()                    { *m = ProcessBundleProgressRequest{} }
+func (m *ProcessBundleProgressRequest) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleProgressRequest) ProtoMessage()               {}
+func (*ProcessBundleProgressRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
+
+func (m *ProcessBundleProgressRequest) GetInstructionReference() string {
+	if m != nil {
+		return m.InstructionReference
+	}
+	return ""
+}
+
+type Metrics struct {
+	Ptransforms map[string]*Metrics_PTransform `protobuf:"bytes,1,rep,name=ptransforms" json:"ptransforms,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	User        map[string]*Metrics_User       `protobuf:"bytes,2,rep,name=user" json:"user,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+}
+
+func (m *Metrics) Reset()                    { *m = Metrics{} }
+func (m *Metrics) String() string            { return proto.CompactTextString(m) }
+func (*Metrics) ProtoMessage()               {}
+func (*Metrics) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
+
+func (m *Metrics) GetPtransforms() map[string]*Metrics_PTransform {
+	if m != nil {
+		return m.Ptransforms
+	}
+	return nil
+}
+
+func (m *Metrics) GetUser() map[string]*Metrics_User {
+	if m != nil {
+		return m.User
+	}
+	return nil
+}
+
+// PTransform level metrics.
+// These metrics are split into processed and active element groups for
+// progress reporting purposes. This allows a Runner to see what is measured,
+// what is estimated and what can be extrapolated to be able to accurately
+// estimate the backlog of remaining work.
+type Metrics_PTransform struct {
+	// (Required): Metrics for processed elements.
+	ProcessedElements *Metrics_PTransform_ProcessedElements `protobuf:"bytes,1,opt,name=processed_elements,json=processedElements" json:"processed_elements,omitempty"`
+	// (Required): Metrics for active elements.
+	ActiveElements *Metrics_PTransform_ActiveElements `protobuf:"bytes,2,opt,name=active_elements,json=activeElements" json:"active_elements,omitempty"`
+	// (Optional): Map from local output name to its watermark.
+	// The watermarks reported are tentative, to get a better sense of progress
+	// while processing a bundle but before it is committed. At bundle commit
+	// time, a Runner needs to also take into account the timers set to compute
+	// the actual watermarks.
+	Watermarks map[string]int64 `protobuf:"bytes,3,rep,name=watermarks" json:"watermarks,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
+}
+
+func (m *Metrics_PTransform) Reset()                    { *m = Metrics_PTransform{} }
+func (m *Metrics_PTransform) String() string            { return proto.CompactTextString(m) }
+func (*Metrics_PTransform) ProtoMessage()               {}
+func (*Metrics_PTransform) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10, 0} }
+
+func (m *Metrics_PTransform) GetProcessedElements() *Metrics_PTransform_ProcessedElements {
+	if m != nil {
+		return m.ProcessedElements
+	}
+	return nil
+}
+
+func (m *Metrics_PTransform) GetActiveElements() *Metrics_PTransform_ActiveElements {
+	if m != nil {
+		return m.ActiveElements
+	}
+	return nil
+}
+
+func (m *Metrics_PTransform) GetWatermarks() map[string]int64 {
+	if m != nil {
+		return m.Watermarks
+	}
+	return nil
+}
+
+// Metrics that are measured for processed and active element groups.
+type Metrics_PTransform_Measured struct {
+	// (Optional) Map from local input name to number of elements processed
+	// from this input.
+	// If unset, assumed to be the sum of the outputs of all producers to
+	// this transform (for ProcessedElements) and 0 (for ActiveElements).
+	InputElementCounts map[string]int64 `protobuf:"bytes,1,rep,name=input_element_counts,json=inputElementCounts" json:"input_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
+	// (Required) Map from local output name to number of elements produced
+	// for this output.
+	OutputElementCounts map[string]int64 `protobuf:"bytes,2,rep,name=output_element_counts,json=outputElementCounts" json:"output_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
+	// (Optional) The total time spent so far in processing the elements in
+	// this group, in seconds.
+	TotalTimeSpent float64 `protobuf:"fixed64,3,opt,name=total_time_spent,json=totalTimeSpent" json:"total_time_spent,omitempty"`
+}
+
+func (m *Metrics_PTransform_Measured) Reset()         { *m = Metrics_PTransform_Measured{} }
+func (m *Metrics_PTransform_Measured) String() string { return proto.CompactTextString(m) }
+func (*Metrics_PTransform_Measured) ProtoMessage()    {}
+func (*Metrics_PTransform_Measured) Descriptor() ([]byte, []int) {
+	return fileDescriptor0, []int{10, 0, 0}
+}
+
+func (m *Metrics_PTransform_Measured) GetInputElementCounts() map[string]int64 {
+	if m != nil {
+		return m.InputElementCounts
+	}
+	return nil
+}
+
+func (m *Metrics_PTransform_Measured) GetOutputElementCounts() map[string]int64 {
+	if m != nil {
+		return m.OutputElementCounts
+	}
+	return nil
+}
+
+func (m *Metrics_PTransform_Measured) GetTotalTimeSpent() float64 {
+	if m != nil {
+		return m.TotalTimeSpent
+	}
+	return 0
+}
+
+// Metrics for fully processed elements.
+type Metrics_PTransform_ProcessedElements struct {
+	// (Required)
+	Measured *Metrics_PTransform_Measured `protobuf:"bytes,1,opt,name=measured" json:"measured,omitempty"`
+}
+
+func (m *Metrics_PTransform_ProcessedElements) Reset()         { *m = Metrics_PTransform_ProcessedElements{} }
+func (m *Metrics_PTransform_ProcessedElements) String() string { return proto.CompactTextString(m) }
+func (*Metrics_PTransform_ProcessedElements) ProtoMessage()    {}
+func (*Metrics_PTransform_ProcessedElements) Descriptor() ([]byte, []int) {
+	return fileDescriptor0, []int{10, 0, 1}
+}
+
+func (m *Metrics_PTransform_ProcessedElements) GetMeasured() *Metrics_PTransform_Measured {
+	if m != nil {
+		return m.Measured
+	}
+	return nil
+}
+
+// Metrics for active elements.
+// An element is considered active if the SDK has started but not finished
+// processing it yet.
+type Metrics_PTransform_ActiveElements struct {
+	// (Required)
+	Measured *Metrics_PTransform_Measured `protobuf:"bytes,1,opt,name=measured" json:"measured,omitempty"`
+	// (Optional) Sum of estimated fraction of known work remaining for all
+	// active elements, as reported by this transform.
+	// If not reported, a Runner could extrapolate this from the processed
+	// elements.
+	// TODO: Handle the case when known work is infinite.
+	FractionRemaining float64 `protobuf:"fixed64,2,opt,name=fraction_remaining,json=fractionRemaining" json:"fraction_remaining,omitempty"`
+	// (Optional) Map from local output name to sum of estimated number
+	// of elements remaining for this output from all active elements,
+	// as reported by this transform.
+	// If not reported, a Runner could extrapolate this from the processed
+	// elements.
+	OutputElementsRemaining map[string]int64 `protobuf:"bytes,3,rep,name=output_elements_remaining,json=outputElementsRemaining" json:"output_elements_remaining,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"`
+}
+
+func (m *Metrics_PTransform_ActiveElements) Reset()         { *m = Metrics_PTransform_ActiveElements{} }
+func (m *Metrics_PTransform_ActiveElements) String() string { return proto.CompactTextString(m) }
+func (*Metrics_PTransform_ActiveElements) ProtoMessage()    {}
+func (*Metrics_PTransform_ActiveElements) Descriptor() ([]byte, []int) {
+	return fileDescriptor0, []int{10, 0, 2}
+}
+
+func (m *Metrics_PTransform_ActiveElements) GetMeasured() *Metrics_PTransform_Measured {
+	if m != nil {
+		return m.Measured
+	}
+	return nil
+}
+
+func (m *Metrics_PTransform_ActiveElements) GetFractionRemaining() float64 {
+	if m != nil {
+		return m.FractionRemaining
+	}
+	return 0
+}
+
+func (m *Metrics_PTransform_ActiveElements) GetOutputElementsRemaining() map[string]int64 {
+	if m != nil {
+		return m.OutputElementsRemaining
+	}
+	return nil
+}
+
+// User defined metrics
+type Metrics_User struct {
+}
+
+func (m *Metrics_User) Reset()                    { *m = Metrics_User{} }
+func (m *Metrics_User) String() string            { return proto.CompactTextString(m) }
+func (*Metrics_User) ProtoMessage()               {}
+func (*Metrics_User) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10, 1} }
+
+type ProcessBundleProgressResponse struct {
+	// (Required)
+	Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics" json:"metrics,omitempty"`
+}
+
+func (m *ProcessBundleProgressResponse) Reset()                    { *m = ProcessBundleProgressResponse{} }
+func (m *ProcessBundleProgressResponse) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleProgressResponse) ProtoMessage()               {}
+func (*ProcessBundleProgressResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
+
+func (m *ProcessBundleProgressResponse) GetMetrics() *Metrics {
+	if m != nil {
+		return m.Metrics
+	}
+	return nil
+}
+
+type ProcessBundleSplitRequest struct {
+	// (Required) A reference to an active process bundle request with the given
+	// instruction id.
+	InstructionReference string `protobuf:"bytes,1,opt,name=instruction_reference,json=instructionReference" json:"instruction_reference,omitempty"`
+	// (Required) The fraction of work (when compared to the known amount of work)
+	// the process bundle request should try to split at.
+	Fraction float64 `protobuf:"fixed64,2,opt,name=fraction" json:"fraction,omitempty"`
+}
+
+func (m *ProcessBundleSplitRequest) Reset()                    { *m = ProcessBundleSplitRequest{} }
+func (m *ProcessBundleSplitRequest) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleSplitRequest) ProtoMessage()               {}
+func (*ProcessBundleSplitRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
+
+func (m *ProcessBundleSplitRequest) GetInstructionReference() string {
+	if m != nil {
+		return m.InstructionReference
+	}
+	return ""
+}
+
+func (m *ProcessBundleSplitRequest) GetFraction() float64 {
+	if m != nil {
+		return m.Fraction
+	}
+	return 0
+}
+
+// urn:org.apache.beam:restriction:element-count:1.0
+type ElementCountRestriction struct {
+	// A restriction representing the number of elements that should be processed.
+	// Effectively the range [0, count]
+	Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
+}
+
+func (m *ElementCountRestriction) Reset()                    { *m = ElementCountRestriction{} }
+func (m *ElementCountRestriction) String() string            { return proto.CompactTextString(m) }
+func (*ElementCountRestriction) ProtoMessage()               {}
+func (*ElementCountRestriction) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
+
+func (m *ElementCountRestriction) GetCount() int64 {
+	if m != nil {
+		return m.Count
+	}
+	return 0
+}
+
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+type ElementCountSkipRestriction struct {
+	// A restriction representing the number of elements that should be skipped.
+	// Effectively the range (count, infinity]
+	Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
+}
+
+func (m *ElementCountSkipRestriction) Reset()                    { *m = ElementCountSkipRestriction{} }
+func (m *ElementCountSkipRestriction) String() string            { return proto.CompactTextString(m) }
+func (*ElementCountSkipRestriction) ProtoMessage()               {}
+func (*ElementCountSkipRestriction) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} }
+
+func (m *ElementCountSkipRestriction) GetCount() int64 {
+	if m != nil {
+		return m.Count
+	}
+	return 0
+}
+
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+//   * a restriction (R_done) representing all elements that will be fully
+//     processed
+//   * a restriction (R_todo) representing all elements that will not be fully
+//     processed
+//
+// where:
+//   R_initial = R_done ⋃ R_todo
+type PrimitiveTransformSplit struct {
+	// (Required) A reference to a primitive transform with the given id that
+	// is part of the active process bundle request with the given instruction
+	// id.
+	PrimitiveTransformReference string `protobuf:"bytes,1,opt,name=primitive_transform_reference,json=primitiveTransformReference" json:"primitive_transform_reference,omitempty"`
+	// (Required) A function specification describing the restriction
+	// that has been completed by the primitive transform.
+	//
+	// For example, a remote GRPC source will have a specific urn and data
+	// block containing an ElementCountRestriction.
+	CompletedRestriction *org_apache_beam_model_pipeline_v1.FunctionSpec `protobuf:"bytes,2,opt,name=completed_restriction,json=completedRestriction" json:"completed_restriction,omitempty"`
+	// (Required) A function specification describing the restriction
+	// representing the remainder of work for the primitive transform.
+	//
+	// FOr example, a remote GRPC source will have a specific urn and data
+	// block contain an ElemntCountSkipRestriction.
+	RemainingRestriction *org_apache_beam_model_pipeline_v1.FunctionSpec `protobuf:"bytes,3,opt,name=remaining_restriction,json=remainingRestriction" json:"remaining_restriction,omitempty"`
+}
+
+func (m *PrimitiveTransformSplit) Reset()                    { *m = PrimitiveTransformSplit{} }
+func (m *PrimitiveTransformSplit) String() string            { return proto.CompactTextString(m) }
+func (*PrimitiveTransformSplit) ProtoMessage()               {}
+func (*PrimitiveTransformSplit) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} }
+
+func (m *PrimitiveTransformSplit) GetPrimitiveTransformReference() string {
+	if m != nil {
+		return m.PrimitiveTransformReference
+	}
+	return ""
+}
+
+func (m *PrimitiveTransformSplit) GetCompletedRestriction() *org_apache_beam_model_pipeline_v1.FunctionSpec {
+	if m != nil {
+		return m.CompletedRestriction
+	}
+	return nil
+}
+
+func (m *PrimitiveTransformSplit) GetRemainingRestriction() *org_apache_beam_model_pipeline_v1.FunctionSpec {
+	if m != nil {
+		return m.RemainingRestriction
+	}
+	return nil
+}
+
+type ProcessBundleSplitResponse struct {
+	// If primitive transform B and C are siblings and descendants of A and A, B,
+	// and C report a split. Then B and C's restrictions are relative to A's.
+	//   R = A_done
+	//     ⋃ (A_boundary ⋂ B_done)
+	//     ⋃ (A_boundary ⋂ B_todo)
+	//     ⋃ (A_boundary ⋂ B_todo)
+	//     ⋃ (A_boundary ⋂ C_todo)
+	//     ⋃ A_todo
+	// If there is no descendant of B or C also reporting a split, than
+	//   B_boundary = ∅ and C_boundary = ∅
+	//
+	// This restriction is processed and completed by the currently active process
+	// bundle request:
+	//   A_done ⋃ (A_boundary ⋂ B_done)
+	//          ⋃ (A_boundary ⋂ C_done)
+	// and these restrictions will be processed by future process bundle requests:
+	//   A_boundary ⋂ B_todo (passed to SDF B directly)
+	//   A_boundary ⋂ C_todo (passed to SDF C directly)
+	//   A_todo (passed to SDF A directly)
+	//
+	// Note that descendants splits should only be reported if it is inexpensive
+	// to compute the boundary restriction intersected with descendants splits.
+	// Also note, that the boundary restriction may represent a set of elements
+	// produced by a parent primitive transform which can not be split at each
+	// element or that there are intermediate unsplittable primitive transforms
+	// between an ancestor splittable function and a descendant splittable
+	// function which may have more than one output per element. Finally note
+	// that the descendant splits should only be reported if the split
+	// information is relatively compact.
+	Splits []*PrimitiveTransformSplit `protobuf:"bytes,1,rep,name=splits" json:"splits,omitempty"`
+}
+
+func (m *ProcessBundleSplitResponse) Reset()                    { *m = ProcessBundleSplitResponse{} }
+func (m *ProcessBundleSplitResponse) String() string            { return proto.CompactTextString(m) }
+func (*ProcessBundleSplitResponse) ProtoMessage()               {}
+func (*ProcessBundleSplitResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} }
+
+func (m *ProcessBundleSplitResponse) GetSplits() []*PrimitiveTransformSplit {
+	if m != nil {
+		return m.Splits
+	}
+	return nil
+}
+
+// Messages used to represent logical byte streams.
+// Stable
+type Elements struct {
+	// (Required) A list containing parts of logical byte streams.
+	Data []*Elements_Data `protobuf:"bytes,1,rep,name=data" json:"data,omitempty"`
+}
+
+func (m *Elements) Reset()                    { *m = Elements{} }
+func (m *Elements) String() string            { return proto.CompactTextString(m) }
+func (*Elements) ProtoMessage()               {}
+func (*Elements) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} }
+
+func (m *Elements) GetData() []*Elements_Data {
+	if m != nil {
+		return m.Data
+	}
+	return nil
+}
+
+// Represents multiple encoded elements in nested context for a given named
+// instruction and target.
+type Elements_Data struct {
+	// (Required) A reference to an active instruction request with the given
+	// instruction id.
+	InstructionReference string `protobuf:"bytes,1,opt,name=instruction_reference,json=instructionReference" json:"instruction_reference,omitempty"`
+	// (Required) A definition representing a consumer or producer of this data.
+	// If received by a harness, this represents the consumer within that
+	// harness that should consume these bytes. If sent by a harness, this
+	// represents the producer of these bytes.
+	//
+	// Note that a single element may span multiple Data messages.
+	//
+	// Note that a sending/receiving pair should share the same target
+	// identifier.
+	Target *Target `protobuf:"bytes,2,opt,name=target" json:"target,omitempty"`
+	// (Optional) Represents a part of a logical byte stream. Elements within
+	// the logical byte stream are encoded in the nested context and
+	// concatenated together.
+	//
+	// An empty data block represents the end of stream for the given
+	// instruction and target.
+	Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (m *Elements_Data) Reset()                    { *m = Elements_Data{} }
+func (m *Elements_Data) String() string            { return proto.CompactTextString(m) }
+func (*Elements_Data) ProtoMessage()               {}
+func (*Elements_Data) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17, 0} }
+
+func (m *Elements_Data) GetInstructionReference() string {
+	if m != nil {
+		return m.InstructionReference
+	}
+	return ""
+}
+
+func (m *Elements_Data) GetTarget() *Target {
+	if m != nil {
+		return m.Target
+	}
+	return nil
+}
+
+func (m *Elements_Data) GetData() []byte {
+	if m != nil {
+		return m.Data
+	}
+	return nil
+}
+
+type StateRequest struct {
+	// (Required) An unique identifier provided by the SDK which represents this
+	// requests execution. The StateResponse corresponding with this request
+	// will have the matching id.
+	Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+	// (Required) The associated instruction id of the work that is currently
+	// being processed. This allows for the runner to associate any modifications
+	// to state to be committed with the appropriate work execution.
+	InstructionReference string `protobuf:"bytes,2,opt,name=instruction_reference,json=instructionReference" json:"instruction_reference,omitempty"`
+	// (Required) The state key this request is for.
+	StateKey *StateKey `protobuf:"bytes,3,opt,name=state_key,json=stateKey" json:"state_key,omitempty"`
+	// (Required) The action to take on this request.
+	//
+	// Types that are valid to be assigned to Request:
+	//	*StateRequest_Get
+	//	*StateRequest_Append
+	//	*StateRequest_Clear
+	Request isStateRequest_Request `protobuf_oneof:"request"`
+}
+
+func (m *StateRequest) Reset()                    { *m = StateRequest{} }
+func (m *StateRequest) String() string            { return proto.CompactTextString(m) }
+func (*StateRequest) ProtoMessage()               {}
+func (*StateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} }
+
+type isStateRequest_Request interface {
+	isStateRequest_Request()
+}
+
+type StateRequest_Get struct {
+	Get *StateGetRequest `protobuf:"bytes,1000,opt,name=get,oneof"`
+}
+type StateRequest_Append struct {
+	Append *StateAppendRequest `protobuf:"bytes,1001,opt,name=append,oneof"`
+}
+type StateRequest_Clear struct {
+	Clear *StateClearRequest `protobuf:"bytes,1002,opt,name=clear,oneof"`
+}
+
+func (*StateRequest_Get) isStateRequest_Request()    {}
+func (*StateRequest_Append) isStateRequest_Request() {}
+func (*StateRequest_Clear) isStateRequest_Request()  {}
+
+func (m *StateRequest) GetRequest() isStateRequest_Request {
+	if m != nil {
+		return m.Request
+	}
+	return nil
+}
+
+func (m *StateRequest) GetId() string {
+	if m != nil {
+		return m.Id
+	}
+	return ""
+}
+
+func (m *StateRequest) GetInstructionReference() string {
+	if m != nil {
+		return m.InstructionReference
+	}
+	return ""
+}
+
+func (m *StateRequest) GetStateKey() *StateKey {
+	if m != nil {
+		return m.StateKey
+	}
+	return nil
+}
+
+func (m *StateRequest) GetGet() *StateGetRequest {
+	if x, ok := m.GetRequest().(*StateRequest_Get); ok {
+		return x.Get
+	}
+	return nil
+}
+
+func (m *StateRequest) GetAppend() *StateAppendRequest {
+	if x, ok := m.GetRequest().(*StateRequest_Append); ok {
+		return x.Append
+	}
+	return nil
+}
+
+func (m *StateRequest) GetClear() *StateClearRequest {
+	if x, ok := m.GetRequest().(*StateRequest_Clear); ok {
+		return x.Clear
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateRequest) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _StateRequest_OneofMarshaler, _StateRequest_OneofUnmarshaler, _StateRequest_OneofSizer, []interface{}{
+		(*StateRequest_Get)(nil),
+		(*StateRequest_Append)(nil),
+		(*StateRequest_Clear)(nil),
+	}
+}
+
+func _StateRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*StateRequest)
+	// request
+	switch x := m.Request.(type) {
+	case *StateRequest_Get:
+		b.EncodeVarint(1000<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Get); err != nil {
+			return err
+		}
+	case *StateRequest_Append:
+		b.EncodeVarint(1001<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Append); err != nil {
+			return err
+		}
+	case *StateRequest_Clear:
+		b.EncodeVarint(1002<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Clear); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("StateRequest.Request has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _StateRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*StateRequest)
+	switch tag {
+	case 1000: // request.get
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateGetRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &StateRequest_Get{msg}
+		return true, err
+	case 1001: // request.append
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateAppendRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &StateRequest_Append{msg}
+		return true, err
+	case 1002: // request.clear
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateClearRequest)
+		err := b.DecodeMessage(msg)
+		m.Request = &StateRequest_Clear{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _StateRequest_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*StateRequest)
+	// request
+	switch x := m.Request.(type) {
+	case *StateRequest_Get:
+		s := proto.Size(x.Get)
+		n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateRequest_Append:
+		s := proto.Size(x.Append)
+		n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateRequest_Clear:
+		s := proto.Size(x.Clear)
+		n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+type StateResponse struct {
+	// (Required) A reference provided by the SDK which represents a requests
+	// execution. The StateResponse must have the matching id when responding
+	// to the SDK.
+	Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+	// (Optional) If this is specified, then the state request has failed.
+	// A human readable string representing the reason as to why the request
+	// failed.
+	Error string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
+	// (Optional) If this is specified, then the result of this state request
+	// can be cached using the supplied token.
+	CacheToken []byte `protobuf:"bytes,3,opt,name=cache_token,json=cacheToken,proto3" json:"cache_token,omitempty"`
+	// A corresponding response matching the request will be populated.
+	//
+	// Types that are valid to be assigned to Response:
+	//	*StateResponse_Get
+	//	*StateResponse_Append
+	//	*StateResponse_Clear
+	Response isStateResponse_Response `protobuf_oneof:"response"`
+}
+
+func (m *StateResponse) Reset()                    { *m = StateResponse{} }
+func (m *StateResponse) String() string            { return proto.CompactTextString(m) }
+func (*StateResponse) ProtoMessage()               {}
+func (*StateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} }
+
+type isStateResponse_Response interface {
+	isStateResponse_Response()
+}
+
+type StateResponse_Get struct {
+	Get *StateGetResponse `protobuf:"bytes,1000,opt,name=get,oneof"`
+}
+type StateResponse_Append struct {
+	Append *StateAppendResponse `protobuf:"bytes,1001,opt,name=append,oneof"`
+}
+type StateResponse_Clear struct {
+	Clear *StateClearResponse `protobuf:"bytes,1002,opt,name=clear,oneof"`
+}
+
+func (*StateResponse_Get) isStateResponse_Response()    {}
+func (*StateResponse_Append) isStateResponse_Response() {}
+func (*StateResponse_Clear) isStateResponse_Response()  {}
+
+func (m *StateResponse) GetResponse() isStateResponse_Response {
+	if m != nil {
+		return m.Response
+	}
+	return nil
+}
+
+func (m *StateResponse) GetId() string {
+	if m != nil {
+		return m.Id
+	}
+	return ""
+}
+
+func (m *StateResponse) GetError() string {
+	if m != nil {
+		return m.Error
+	}
+	return ""
+}
+
+func (m *StateResponse) GetCacheToken() []byte {
+	if m != nil {
+		return m.CacheToken
+	}
+	return nil
+}
+
+func (m *StateResponse) GetGet() *StateGetResponse {
+	if x, ok := m.GetResponse().(*StateResponse_Get); ok {
+		return x.Get
+	}
+	return nil
+}
+
+func (m *StateResponse) GetAppend() *StateAppendResponse {
+	if x, ok := m.GetResponse().(*StateResponse_Append); ok {
+		return x.Append
+	}
+	return nil
+}
+
+func (m *StateResponse) GetClear() *StateClearResponse {
+	if x, ok := m.GetResponse().(*StateResponse_Clear); ok {
+		return x.Clear
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _StateResponse_OneofMarshaler, _StateResponse_OneofUnmarshaler, _StateResponse_OneofSizer, []interface{}{
+		(*StateResponse_Get)(nil),
+		(*StateResponse_Append)(nil),
+		(*StateResponse_Clear)(nil),
+	}
+}
+
+func _StateResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*StateResponse)
+	// response
+	switch x := m.Response.(type) {
+	case *StateResponse_Get:
+		b.EncodeVarint(1000<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Get); err != nil {
+			return err
+		}
+	case *StateResponse_Append:
+		b.EncodeVarint(1001<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Append); err != nil {
+			return err
+		}
+	case *StateResponse_Clear:
+		b.EncodeVarint(1002<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Clear); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("StateResponse.Response has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _StateResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*StateResponse)
+	switch tag {
+	case 1000: // response.get
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateGetResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &StateResponse_Get{msg}
+		return true, err
+	case 1001: // response.append
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateAppendResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &StateResponse_Append{msg}
+		return true, err
+	case 1002: // response.clear
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateClearResponse)
+		err := b.DecodeMessage(msg)
+		m.Response = &StateResponse_Clear{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _StateResponse_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*StateResponse)
+	// response
+	switch x := m.Response.(type) {
+	case *StateResponse_Get:
+		s := proto.Size(x.Get)
+		n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateResponse_Append:
+		s := proto.Size(x.Append)
+		n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateResponse_Clear:
+		s := proto.Size(x.Clear)
+		n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+type StateKey struct {
+	// (Required) One of the following state keys must be set.
+	//
+	// Types that are valid to be assigned to Type:
+	//	*StateKey_Runner_
+	//	*StateKey_MultimapSideInput_
+	//	*StateKey_BagUserState_
+	Type isStateKey_Type `protobuf_oneof:"type"`
+}
+
+func (m *StateKey) Reset()                    { *m = StateKey{} }
+func (m *StateKey) String() string            { return proto.CompactTextString(m) }
+func (*StateKey) ProtoMessage()               {}
+func (*StateKey) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} }
+
+type isStateKey_Type interface {
+	isStateKey_Type()
+}
+
+type StateKey_Runner_ struct {
+	Runner *StateKey_Runner `protobuf:"bytes,1,opt,name=runner,oneof"`
+}
+type StateKey_MultimapSideInput_ struct {
+	MultimapSideInput *StateKey_MultimapSideInput `protobuf:"bytes,2,opt,name=multimap_side_input,json=multimapSideInput,oneof"`
+}
+type StateKey_BagUserState_ struct {
+	BagUserState *StateKey_BagUserState `protobuf:"bytes,3,opt,name=bag_user_state,json=bagUserState,oneof"`
+}
+
+func (*StateKey_Runner_) isStateKey_Type()            {}
+func (*StateKey_MultimapSideInput_) isStateKey_Type() {}
+func (*StateKey_BagUserState_) isStateKey_Type()      {}
+
+func (m *StateKey) GetType() isStateKey_Type {
+	if m != nil {
+		return m.Type
+	}
+	return nil
+}
+
+func (m *StateKey) GetRunner() *StateKey_Runner {
+	if x, ok := m.GetType().(*StateKey_Runner_); ok {
+		return x.Runner
+	}
+	return nil
+}
+
+func (m *StateKey) GetMultimapSideInput() *StateKey_MultimapSideInput {
+	if x, ok := m.GetType().(*StateKey_MultimapSideInput_); ok {
+		return x.MultimapSideInput
+	}
+	return nil
+}
+
+func (m *StateKey) GetBagUserState() *StateKey_BagUserState {
+	if x, ok := m.GetType().(*StateKey_BagUserState_); ok {
+		return x.BagUserState
+	}
+	return nil
+}
+
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateKey) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
+	return _StateKey_OneofMarshaler, _StateKey_OneofUnmarshaler, _StateKey_OneofSizer, []interface{}{
+		(*StateKey_Runner_)(nil),
+		(*StateKey_MultimapSideInput_)(nil),
+		(*StateKey_BagUserState_)(nil),
+	}
+}
+
+func _StateKey_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+	m := msg.(*StateKey)
+	// type
+	switch x := m.Type.(type) {
+	case *StateKey_Runner_:
+		b.EncodeVarint(1<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.Runner); err != nil {
+			return err
+		}
+	case *StateKey_MultimapSideInput_:
+		b.EncodeVarint(2<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.MultimapSideInput); err != nil {
+			return err
+		}
+	case *StateKey_BagUserState_:
+		b.EncodeVarint(3<<3 | proto.WireBytes)
+		if err := b.EncodeMessage(x.BagUserState); err != nil {
+			return err
+		}
+	case nil:
+	default:
+		return fmt.Errorf("StateKey.Type has unexpected type %T", x)
+	}
+	return nil
+}
+
+func _StateKey_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
+	m := msg.(*StateKey)
+	switch tag {
+	case 1: // type.runner
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateKey_Runner)
+		err := b.DecodeMessage(msg)
+		m.Type = &StateKey_Runner_{msg}
+		return true, err
+	case 2: // type.multimap_side_input
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateKey_MultimapSideInput)
+		err := b.DecodeMessage(msg)
+		m.Type = &StateKey_MultimapSideInput_{msg}
+		return true, err
+	case 3: // type.bag_user_state
+		if wire != proto.WireBytes {
+			return true, proto.ErrInternalBadWireType
+		}
+		msg := new(StateKey_BagUserState)
+		err := b.DecodeMessage(msg)
+		m.Type = &StateKey_BagUserState_{msg}
+		return true, err
+	default:
+		return false, nil
+	}
+}
+
+func _StateKey_OneofSizer(msg proto.Message) (n int) {
+	m := msg.(*StateKey)
+	// type
+	switch x := m.Type.(type) {
+	case *StateKey_Runner_:
+		s := proto.Size(x.Runner)
+		n += proto.SizeVarint(1<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateKey_MultimapSideInput_:
+		s := proto.Size(x.MultimapSideInput)
+		n += proto.SizeVarint(2<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case *StateKey_BagUserState_:
+		s := proto.Size(x.BagUserState)
+		n += proto.SizeVarint(3<<3 | proto.WireBytes)
+		n += proto.SizeVarint(uint64(s))
+		n += s
+	case nil:
+	default:
+		panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+	}
+	return n
+}
+
+type StateKey_Runner struct {
+	// (Required) Opaque information supplied by the runner. Used to support
+	// remote references.
+	Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *StateKey_Runner) Reset()                    { *m = StateKey_Runner{} }
+func (m *StateKey_Runner) String() string            { return proto.CompactTextString(m) }
+func (*StateKey_Runner) ProtoMessage()               {}
+func (*StateKey_Runner) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20, 0} }
+
+func (m *StateKey_Runner) GetKey() []byte {
+	if m != nil {
+		return m.Key
+	}
+	return nil
+}
+
+type StateKey_MultimapSideInput struct {
+	// (Required) The id of the PTransform containing a side input.
+	PtransformId string `protobuf:"bytes,1,opt,name=ptransform_id,json=ptransformId" json:"ptransform_id,omitempty"`
+	// (Required) The id of the side input.
+	SideInputId string `protobuf:"bytes,2,opt,name=side_input_id,json=sideInputId" json:"side_input_id,omitempty"`
+	// (Required) The window (after mapping the currently executing elements
+	// window into the side input windows domain) encoded in a nested context.
+	Window []byte `protobuf:"bytes,3,opt,name=window,proto3" json:"window,omitempty"`
+	// (Required) The key encoded in a nested context.
+	Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *StateKey_MultimapSideInput) Reset()                    { *m = StateKey_MultimapSideInput{} }
+func (m *StateKey_MultimapSideInput) String() string            { return proto.CompactTextString(m) }
+func (*StateKey_MultimapSideInput) ProtoMessage()               {}
+func (*StateKey_MultimapSideInput) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20, 1} }
+
+func (m *StateKey_MultimapSideInput) GetPtransformId() string {
+	if m != nil {
+		return m.PtransformId
+	}
+	return ""
+}
+
+func (m *StateKey_MultimapSideInput) GetSideInputId() string {
+	if m != nil {
+		return m.SideInputId
+	}
+	return ""
+}
+
+func (m *StateKey_MultimapSideInput) GetWindow() []byte {
+	if m != nil {
+		return m.Window
+	}
+	return nil
+}
+
+func (m *StateKey_MultimapSideInput) GetKey() []byte {
+	if m != nil {
+		return m.Key
+	}
+	return nil
+}
+
+type StateKey_BagUserState struct {
+	// (Required) The id of the PTransform containing user state.
+	PtransformId string `protobuf:"bytes,1,opt,name=ptransform_id,json=ptransformId" json:"ptransform_id,omitempty"`
+	// (Required) The id of the user state.
+	UserStateId string `protobuf:"bytes,2,opt,name=user_state_id,json=userStateId" json:"user_state_id,omitempty"`
+	// (Required) The window encoded in a nested context.
+	Window []byte `protobuf:"bytes,3,opt,name=window,proto3" json:"window,omitempty"`
+	// (Required) The key of the currently executing element encoded in a
+	// nested context.
+	Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (m *StateKey_BagUserState) Reset()                    { *m = StateKey_BagUserState{} }
+func (m *StateKey_BagUserState) String() string            { return proto.CompactTextString(m) }
+func (*StateKey_BagUserState) ProtoMessage()               {}
+func (*StateKey_BagUserState) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20, 2} }
+
+func (m *StateKey_BagUserState) GetPtransformId() string {
+	if m != nil {
+		return m.PtransformId
+	}
+	return ""
+}
+
+func (m *StateKey_BagUserState) GetUserStateId() string {
+	if m != nil {
+		return m.UserStateId
+	}
+	return ""
+}
+
+func (m *StateKey_BagUserState) GetWindow() []byte {
+	if m != nil {
+		return m.Window
+	}
+	return nil
+}
+
+func (m *StateKey_BagUserState) GetKey() []byte {
+	if m != nil {
+		return m.Key
+	}
+	return nil
+}
+
+// A request to get state.
+type StateGetRequest struct {
+	// (Optional) If specified, signals to the runner that the response
+	// should resume from the following continuation token.
+	//
+	// If unspecified, signals to the runner that the response should start
+	// from the beginning of the logical continuable stream.
+	ContinuationToken []byte `protobuf:"bytes,1,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"`
+}
+
+func (m *StateGetRequest) Reset()                    { *m = StateGetRequest{} }
+func (m *StateGetRequest) String() string            { return proto.CompactTextString(m) }
+func (*StateGetRequest) ProtoMessage()               {}
+func (*StateGetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} }
+
+func (m *StateGetRequest) GetContinuationToken() []byte {
+	if m != nil {
+		return m.ContinuationToken
+	}
+	return nil
+}
+
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
+type StateGetResponse struct {
+	// (Optional) If specified, represents a token which can be used with the
+	// state API to get the next chunk of this logical byte stream. The end of
+	// the logical byte stream is signalled by this field being unset.
+	ContinuationToken []byte `protobuf:"bytes,1,opt,name=continuation_token,json=continuationToken,proto3" json:"continuation_token,omitempty"`
+	// Represents a part of a logical byte stream. Elements within
+	// the logical byte stream are encoded in the nested context and
+	// concatenated together.
+	Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (m *StateGetResponse) Reset()                    { *m = StateGetResponse{} }
+func (m *StateGetResponse) String() string            { return proto.CompactTextString(m) }
+func (*StateGetResponse) ProtoMessage()               {}
+func (*StateGetResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} }
+
+func (m *StateGetResponse) GetContinuationToken() []byte {
+	if m != nil {
+		return m.ContinuationToken
+	}
+	return nil
+}
+
+func (m *StateGetResponse) GetData() []byte {
+	if m != nil {
+		return m.Data
+	}
+	return nil
+}
+
+// A request to append state.
+type StateAppendRequest struct {
+	// Represents a part of a logical byte stream. Elements within
+	// the logical byte stream are encoded in the nested context and
+	// multiple append requests are concatenated together.
+	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (m *StateAppendRequest) Reset()                    { *m = StateAppendRequest{} }
+func (m *StateAppendRequest) String() string            { return proto.CompactTextString(m) }
+func (*StateAppendRequest) ProtoMessage()               {}
+func (*StateAppendRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} }
+
+func (m *StateAppendRequest) GetData() []byte {
+	if m != nil {
+		return m.Data
+	}
+	return nil
+}
+
+// A response to append state.
+type StateAppendResponse struct {
+}
+
+func (m *StateAppendResponse) Reset()                    { *m = StateAppendResponse{} }
+func (m *StateAppendResponse) String() string            { return proto.CompactTextString(m) }
+func (*StateAppendResponse) ProtoMessage()               {}
+func (*StateAppendResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} }
+
+// A request to clear state.
+type StateClearRequest struct {
+}
+
+func (m *StateClearRequest) Reset()                    { *m = StateClearRequest{} }
+func (m *StateClearRequest) String() string            { return proto.CompactTextString(m) }
+func (*StateClearRequest) ProtoMessage()               {}
+func (*StateClearRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} }
+
+// A response to clear state.
+type StateClearResponse struct {
+}
+
+func (m *StateClearResponse) Reset()                    { *m = StateClearResponse{} }
+func (m *StateClearResponse) String() string            { return proto.CompactTextString(m) }
+func (*StateClearResponse) ProtoMessage()               {}
+func (*StateClearResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} }
+
+// A log entry
+type LogEntry struct {
+	// (Required) The severity of the log statement.
+	Severity LogEntry_Severity_Enum `protobuf:"varint,1,opt,name=severity,enum=org.apache.beam.model.fn_execution.v1.LogEntry_Severity_Enum" json:"severity,omitempty"`
+	// (Required) The time at which this log statement occurred.
+	Timestamp *google_protobuf1.Timestamp `protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"`
+	// (Required) A human readable message.
+	Message string `protobuf:"bytes,3,opt,name=message" json:"message,omitempty"`
+	// (Optional) An optional trace of the functions involved. For example, in
+	// Java this can include multiple causes and multiple suppressed exceptions.
+	Trace string `protobuf:"bytes,4,opt,name=trace" json:"trace,omitempty"`
+	// (Optional) A reference to the instruction this log statement is associated
+	// with.
+	InstructionReference string `protobuf:"bytes,5,opt,name=instruction_reference,json=instructionReference" json:"instruction_reference,omitempty"`
+	// (Optional) A reference to the primitive transform this log statement is
+	// associated with.
+	PrimitiveTransformReference string `protobuf:"bytes,6,opt,name=primitive_transform_reference,json=primitiveTransformReference" json:"primitive_transform_reference,omitempty"`
+	// (Optional) Human-readable name of the function or method being invoked,
+	// with optional context such as the class or package name. The format can
+	// vary by language. For example:
+	//   qual.if.ied.Class.method (Java)
+	//   dir/package.func (Go)
+	//   module.function (Python)
+	//   file.cc:382 (C++)
+	LogLocation string `protobuf:"bytes,7,opt,name=log_location,json=logLocation" json:"log_location,omitempty"`
+	// (Optional) The name of the thread this log statement is associated with.
+	Thread string `protobuf:"bytes,8,opt,name=thread" json:"thread,omitempty"`
+}
+
+func (m *LogEntry) Reset()                    { *m = LogEntry{} }
+func (m *LogEntry) String() string            { return proto.CompactTextString(m) }
+func (*LogEntry) ProtoMessage()               {}
+func (*LogEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27} }
+
+func (m *LogEntry) GetSeverity() LogEntry_Severity_Enum {
+	if m != nil {
+		return m.Severity
+	}
+	return LogEntry_Severity_UNSPECIFIED
+}
+
+func (m *LogEntry) GetTimestamp() *google_protobuf1.Timestamp {
+	if m != nil {
+		return m.Timestamp
+	}
+	return nil
+}
+
+func (m *LogEntry) GetMessage() string {
+	if m != nil {
+		return m.Message
+	}
+	return ""
+}
+
+func (m *LogEntry) GetTrace() string {
+	if m != nil {
+		return m.Trace
+	}
+	return ""
+}
+
+func (m *LogEntry) GetInstructionReference() string {
+	if m != nil {
+		return m.InstructionReference
+	}
+	return ""
+}
+
+func (m *LogEntry) GetPrimitiveTransformReference() string {
+	if m != nil {
+		return m.PrimitiveTransformReference
+	}
+	return ""
+}
+
+func (m *LogEntry) GetLogLocation() string {
+	if m != nil {
+		return m.LogLocation
+	}
+	return ""
+}
+
+func (m *LogEntry) GetThread() string {
+	if m != nil {
+		return m.Thread
+	}
+	return ""
+}
+
+// A list of log entries, enables buffering and batching of multiple
+// log messages using the logging API.
+type LogEntry_List struct {
+	// (Required) One or or more log messages.
+	LogEntries []*LogEntry `protobuf:"bytes,1,rep,name=log_entries,json=logEntries" json:"log_entries,omitempty"`
+}
+
+func (m *LogEntry_List) Reset()                    { *m = LogEntry_List{} }
+func (m *LogEntry_List) String() string            { return proto.CompactTextString(m) }
+func (*LogEntry_List) ProtoMessage()               {}
+func (*LogEntry_List) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27, 0} }
+
+func (m *LogEntry_List) GetLogEntries() []*LogEntry {
+	if m != nil {
+		return m.LogEntries
+	}
+	return nil
+}
+
+// The severity of the event described in a log entry, expressed as one of the
+// severity levels listed below. For your reference, the levels are
+// assigned the listed numeric values. The effect of using numeric values
+// other than those listed is undefined.
+//
+// If you are writing log entries, you should map other severity encodings to
+// one of these standard levels. For example, you might map all of
+// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+//
+// This list is intentionally not comprehensive; the intent is to provide a
+// common set of "good enough" severity levels so that logging front ends
+// can provide filtering and searching across log types. Users of the API are
+// free not to use all severity levels in their log messages.
+type LogEntry_Severity struct {
+}
+
+func (m *LogEntry_Severity) Reset()                    { *m = LogEntry_Severity{} }
+func (m *LogEntry_Severity) String() string            { return proto.CompactTextString(m) }
+func (*LogEntry_Severity) ProtoMessage()               {}
+func (*LogEntry_Severity) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27, 1} }
+
+type LogControl struct {
+}
+
+func (m *LogControl) Reset()                    { *m = LogControl{} }
+func (m *LogControl) String() string            { return proto.CompactTextString(m) }
+func (*LogControl) ProtoMessage()               {}
+func (*LogControl) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} }
+
+// A Docker container configuration for launching the SDK harness to execute
+// user specified functions.
+type DockerContainer struct {
+	// (Required) A pipeline level unique id which can be used as a reference to
+	// refer to this.
+	Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+	// (Required) The Docker container URI
+	// For example "dataflow.gcr.io/v1beta3/java-batch:1.5.1"
+	Uri string `protobuf:"bytes,2,opt,name=uri" json:"uri,omitempty"`
+	// (Optional) Docker registry specification.
+	// If unspecified, the uri is expected to be able to be fetched without
+	// requiring additional configuration by a runner.
+	RegistryReference string `protobuf:"bytes,3,opt,name=registry_reference,json=registryReference" json:"registry_reference,omitempty"`
+}
+
+func (m *DockerContainer) Reset()                    { *m = DockerContainer{} }
+func (m *DockerContainer) String() string            { return proto.CompactTextString(m) }
+func (*DockerContainer) ProtoMessage()               {}
+func (*DockerContainer) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{29} }
+
+func (m *DockerContainer) GetId() string {
+	if m != nil {
+		return m.Id
+	}
+	return ""
+}
+
+func (m *DockerContainer) GetUri() string {
+	if m != nil {
+		return m.Uri
+	}
+	return ""
+}
+
+func (m *DockerContainer) GetRegistryReference() string {
+	if m != nil {
+		return m.RegistryReference
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*Target)(nil), "org.apache.beam.model.fn_execution.v1.Target")
+	proto.RegisterType((*Target_List)(nil), "org.apache.beam.model.fn_execution.v1.Target.List")
+	proto.RegisterType((*RemoteGrpcPort)(nil), "org.apache.beam.model.fn_execution.v1.RemoteGrpcPort")
+	proto.RegisterType((*InstructionRequest)(nil), "org.apache.beam.model.fn_execution.v1.InstructionRequest")
+	proto.RegisterType((*InstructionResponse)(nil), "org.apache.beam.model.fn_execution.v1.InstructionResponse")
+	proto.RegisterType((*RegisterRequest)(nil), "org.apache.beam.model.fn_execution.v1.RegisterRequest")
+	proto.RegisterType((*RegisterResponse)(nil), "org.apache.beam.model.fn_execution.v1.RegisterResponse")
+	proto.RegisterType((*ProcessBundleDescriptor)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleDescriptor")
+	proto.RegisterType((*ProcessBundleRequest)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleRequest")
+	proto.RegisterType((*ProcessBundleResponse)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleResponse")
+	proto.RegisterType((*ProcessBundleProgressRequest)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressRequest")
+	proto.RegisterType((*Metrics)(nil), "org.apache.beam.model.fn_execution.v1.Metrics")
+	proto.RegisterType((*Metrics_PTransform)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform")
+	proto.RegisterType((*Metrics_PTransform_Measured)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.Measured")
+	proto.RegisterType((*Metrics_PTransform_ProcessedElements)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.ProcessedElements")
+	proto.RegisterType((*Metrics_PTransform_ActiveElements)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.PTransform.ActiveElements")
+	proto.RegisterType((*Metrics_User)(nil), "org.apache.beam.model.fn_execution.v1.Metrics.User")
+	proto.RegisterType((*ProcessBundleProgressResponse)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleProgressResponse")
+	proto.RegisterType((*ProcessBundleSplitRequest)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleSplitRequest")
+	proto.RegisterType((*ElementCountRestriction)(nil), "org.apache.beam.model.fn_execution.v1.ElementCountRestriction")
+	proto.RegisterType((*ElementCountSkipRestriction)(nil), "org.apache.beam.model.fn_execution.v1.ElementCountSkipRestriction")
+	proto.RegisterType((*PrimitiveTransformSplit)(nil), "org.apache.beam.model.fn_execution.v1.PrimitiveTransformSplit")
+	proto.RegisterType((*ProcessBundleSplitResponse)(nil), "org.apache.beam.model.fn_execution.v1.ProcessBundleSplitResponse")
+	proto.RegisterType((*Elements)(nil), "org.apache.beam.model.fn_execution.v1.Elements")
+	proto.RegisterType((*Elements_Data)(nil), "org.apache.beam.model.fn_execution.v1.Elements.Data")
+	proto.RegisterType((*StateRequest)(nil), "org.apache.beam.model.fn_execution.v1.StateRequest")
+	proto.RegisterType((*StateResponse)(nil), "org.apache.beam.model.fn_execution.v1.StateResponse")
+	proto.RegisterType((*StateKey)(nil), "org.apache.beam.model.fn_execution.v1.StateKey")
+	proto.RegisterType((*StateKey_Runner)(nil), "org.apache.beam.model.fn_execution.v1.StateKey.Runner")
+	proto.RegisterType((*StateKey_MultimapSideInput)(nil), "org.apache.beam.model.fn_execution.v1.StateKey.MultimapSideInput")
+	proto.RegisterType((*StateKey_BagUserState)(nil), "org.apache.beam.model.fn_execution.v1.StateKey.BagUserState")
+	proto.RegisterType((*StateGetRequest)(nil), "org.apache.beam.model.fn_execution.v1.StateGetRequest")
+	proto.RegisterType((*StateGetResponse)(nil), "org.apache.beam.model.fn_execution.v1.StateGetResponse")
+	proto.RegisterType((*StateAppendRequest)(nil), "org.apache.beam.model.fn_execution.v1.StateAppendRequest")
+	proto.RegisterType((*StateAppendResponse)(nil), "org.apache.beam.model.fn_execution.v1.StateAppendResponse")
+	proto.RegisterType((*StateClearRequest)(nil), "org.apache.beam.model.fn_execution.v1.StateClearRequest")
+	proto.RegisterType((*StateClearResponse)(nil), "org.apache.beam.model.fn_execution.v1.StateClearResponse")
+	proto.RegisterType((*LogEntry)(nil), "org.apache.beam.model.fn_execution.v1.LogEntry")
+	proto.RegisterType((*LogEntry_List)(nil), "org.apache.beam.model.fn_execution.v1.LogEntry.List")
+	proto.RegisterType((*LogEntry_Severity)(nil), "org.apache.beam.model.fn_execution.v1.LogEntry.Severity")
+	proto.RegisterType((*LogControl)(nil), "org.apache.beam.model.fn_execution.v1.LogControl")
+	proto.RegisterType((*DockerContainer)(nil), "org.apache.beam.model.fn_execution.v1.DockerContainer")
+	proto.RegisterEnum("org.apache.beam.model.fn_execution.v1.LogEntry_Severity_Enum", LogEntry_Severity_Enum_name, LogEntry_Severity_Enum_value)
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+
+// Client API for BeamFnControl service
+
+type BeamFnControlClient interface {
+	// Instructions sent by the runner to the SDK requesting different types
+	// of work.
+	Control(ctx context.Context, opts ...grpc.CallOption) (BeamFnControl_ControlClient, error)
+}
+
+type beamFnControlClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewBeamFnControlClient(cc *grpc.ClientConn) BeamFnControlClient {
+	return &beamFnControlClient{cc}
+}
+
+func (c *beamFnControlClient) Control(ctx context.Context, opts ...grpc.CallOption) (BeamFnControl_ControlClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_BeamFnControl_serviceDesc.Streams[0], c.cc, "/org.apache.beam.model.fn_execution.v1.BeamFnControl/Control", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &beamFnControlControlClient{stream}
+	return x, nil
+}
+
+type BeamFnControl_ControlClient interface {
+	Send(*InstructionResponse) error
+	Recv() (*InstructionRequest, error)
+	grpc.ClientStream
+}
+
+type beamFnControlControlClient struct {
+	grpc.ClientStream
+}
+
+func (x *beamFnControlControlClient) Send(m *InstructionResponse) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *beamFnControlControlClient) Recv() (*InstructionRequest, error) {
+	m := new(InstructionRequest)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// Server API for BeamFnControl service
+
+type BeamFnControlServer interface {
+	// Instructions sent by the runner to the SDK requesting different types
+	// of work.
+	Control(BeamFnControl_ControlServer) error
+}
+
+func RegisterBeamFnControlServer(s *grpc.Server, srv BeamFnControlServer) {
+	s.RegisterService(&_BeamFnControl_serviceDesc, srv)
+}
+
+func _BeamFnControl_Control_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(BeamFnControlServer).Control(&beamFnControlControlServer{stream})
+}
+
+type BeamFnControl_ControlServer interface {
+	Send(*InstructionRequest) error
+	Recv() (*InstructionResponse, error)
+	grpc.ServerStream
+}
+
+type beamFnControlControlServer struct {
+	grpc.ServerStream
+}
+
+func (x *beamFnControlControlServer) Send(m *InstructionRequest) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *beamFnControlControlServer) Recv() (*InstructionResponse, error) {
+	m := new(InstructionResponse)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _BeamFnControl_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "org.apache.beam.model.fn_execution.v1.BeamFnControl",
+	HandlerType: (*BeamFnControlServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Control",
+			Handler:       _BeamFnControl_Control_Handler,
+			ServerStreams: true,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "beam_fn_api.proto",
+}
+
+// Client API for BeamFnData service
+
+type BeamFnDataClient interface {
+	// Used to send data between harnesses.
+	Data(ctx context.Context, opts ...grpc.CallOption) (BeamFnData_DataClient, error)
+}
+
+type beamFnDataClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewBeamFnDataClient(cc *grpc.ClientConn) BeamFnDataClient {
+	return &beamFnDataClient{cc}
+}
+
+func (c *beamFnDataClient) Data(ctx context.Context, opts ...grpc.CallOption) (BeamFnData_DataClient, error) {
+	stream, err := grpc.NewClientStream(ctx, &_BeamFnData_serviceDesc.Streams[0], c.cc, "/org.apache.beam.model.fn_execution.v1.BeamFnData/Data", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &beamFnDataDataClient{stream}
+	return x, nil
+}
+
+type BeamFnData_DataClient interface {
+	Send(*Elements) error
+	Recv() (*Elements, error)
+	grpc.ClientStream
+}
+
+type beamFnDataDataClient struct {
+	grpc.ClientStream
+}
+
+func (x *beamFnDataDataClient) Send(m *Elements) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *beamFnDataDataClient) Recv() (*Elements, error) {
+	m := new(Elements)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// Server API for BeamFnData service
+
+type BeamFnDataServer interface {
+	// Used to send data between harnesses.
+	Data(BeamFnData_DataServer) error
+}
+
+func RegisterBeamFnDataServer(s *grpc.Server, srv BeamFnDataServer) {
+	s.RegisterService(&_BeamFnData_serviceDesc, srv)
+}
+
+func _BeamFnData_Data_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(BeamFnDataServer).Data(&beamFnDataDataServer{stream})
+}
+
+type BeamFnData_DataServer interface {
+	Send(*Elements) error
+	Recv() (*Elements, error)
+	grpc.ServerStream
+}
+
+type beamFnDataDataServer struct {
+	grpc.ServerStream
+}
+
+func (x *beamFnDataDataServer) Send(m *Elements) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *beamFnDataDataServer) Recv() (*Elements, error) {
+	m := new(Elements)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _BeamFnData_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "org.apache.beam.model.fn_execution.v1.BeamFnData",
+	HandlerType: (*BeamFnDataServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "Data",
+			Handler:       _BeamFnData_Data_Handler,
+			ServerStreams: true,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "beam_fn_api.proto",
+}
+
+// Client API for BeamFnState service
+
+type BeamFnStateClient interface {
+	// Used to get/append/clear state stored by the runner on behalf of the SDK.
+	State(ctx context.Context, opts ...grpc.CallOption) (BeamFnState_StateClient, error)
+}
+
+type beamFnStateClient struct {
+	cc *grpc.ClientConn
+}
+
+func NewBeamFnStateClient(cc *grpc.ClientConn) BeamFnStateClient {
+	return &beamFnStateC

<TRUNCATED>