You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Luke Cwik (JIRA)" <ji...@apache.org> on 2018/03/06 21:03:00 UTC

[jira] [Updated] (BEAM-3787) Migrate Fn API to be bidirectional instruction/request stream

     [ https://issues.apache.org/jira/browse/BEAM-3787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Luke Cwik updated BEAM-3787:
----------------------------
    Labels: portability  (was: )

> Migrate Fn API to be bidirectional instruction/request stream
> -------------------------------------------------------------
>
>                 Key: BEAM-3787
>                 URL: https://issues.apache.org/jira/browse/BEAM-3787
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Luke Cwik
>            Assignee: Robert Bradshaw
>            Priority: Major
>              Labels: portability
>
> Allow the SDK to request the Runner to do something on its behalf. This mechanism can be used for:
> * Reporting final counters
> * Work shedding (SDK can choose to reduce the amount of work it wants to do (checkpointing))
> * Requesting process bundle descriptors (instead of requiring the Runner to send them and have the SDK cache them).
> * Decoupling the message type in control allows for new types of messages to be added which are not one to one.
> Example API change below (note that SdkMessage/RunnerMessage should use a different name):
> {code:java}
> // An API that describes control messages between the SDK and Runner to process
> // bundles, split bundles, report progress, ...
> service BeamFnControl {
>   // 
>   rpc Control(
>     // A stream of SDK requests/responses.
>     stream SdkMessage
>   ) returns (
>     // A stream of Runner requests/responses.
>     stream RunnerMessage
>   ) {}
> }
> // Messages a Runner can send over the control plane.
> message RunnerMessage {
>   // (Required) An unique identifier provided by the runner which represents
>   // this requests execution. The RunnerInstructionResponse MUST have the matching id.
>   string id = 1;
>   oneof message {
>     ErrorResponse error = 999;
>     RegisterRequest register = 1000;
>     ProcessBundleRequest process_bundle = 1001;
>     ProcessBundleProgressRequest process_bundle_progress = 1002;
>     ProcessBundleSplitRequest process_bundle_split = 1003;
>     ShedBundleResponse shed_bundle = 1000;
>   }
> }
> // Messages an SDK can send over the control plane.
> message SdkMessage {
>   oneof message {
>     RunnerInstructionResponse runner_instruction_response = 1000;
>     SdkInstructionRequest sdk_instruction_request = 1001;
>   }
> }
> // A request sent by a runner which the SDK is asked to fulfill.
> // For any unsupported request type, an error should be returned with a
> // matching instruction id.
> // Stable
> message RunnerInstructionRequest {
>   // (Required) An unique identifier provided by the runner which represents
>   // this requests execution. The RunnerInstructionResponse MUST have the matching id.
>   string instruction_id = 1;
>   // (Required) A request that the SDK Harness needs to interpret.
>   oneof request {
>     RegisterRequest register = 1000;
>     ProcessBundleRequest process_bundle = 1001;
>     ProcessBundleProgressRequest process_bundle_progress = 1002;
>     ProcessBundleSplitRequest process_bundle_split = 1003;
>   }
> }
> // The response for an associated request the SDK had been asked to fulfill.
> // Stable
> message RunnerInstructionResponse {
>   // (Required) A reference provided by the runner which represents a requests
>   // execution. The RunnerInstructionResponse MUST have the matching id when
>   // responding to the runner.
>   string instruction_id = 1;
>   // If this is specified, then this instruction has failed.
>   // A human readable string representing the reason as to why processing has
>   // failed.
>   string error = 2;
>   // If the instruction did not fail, it is required to return an equivalent
>   // response type depending on the request this matches.
>   oneof response {
>     RegisterResponse register = 1000;
>     ProcessBundleResponse process_bundle = 1001;
>     ProcessBundleProgressResponse process_bundle_progress = 1002;
>     ProcessBundleSplitResponse process_bundle_split = 1003;
>   }
> }
> message SdkInstructionRequest {
>   // (Required) An unique identifier provided by the SDK which represents
>   // this requests execution. The SdkInstructionResponse MUST have the matching id.
>   string instruction_id = 1;
>   // (Required) A request that the Runner needs to interpret.
>   oneof request {
>     ShedBundleRequest shed_bundle = 1000;
>   }
> }
> // The response for an associated request the Runner had been asked to fulfill.
> // Stable
> message RunnerInstructionResponse {
>   // (Required) A reference provided by the SDK which represents a requests
>   // execution. The RunnerInstructionResponse MUST have the matching id when
>   // responding to the SDK.
>   string instruction_id = 1;
>   // If this is specified, then this instruction has failed.
>   // A human readable string representing the reason as to why processing has
>   // failed.
>   string error = 2;
>   // If the instruction did not fail, it is required to return an equivalent
>   // response type depending on the request this matches.
>   oneof response {
>     ShedBundleResponse shed_bundle = 1000;
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)