You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Luke Cwik (JIRA)" <ji...@apache.org> on 2018/03/06 21:03:00 UTC
[jira] [Updated] (BEAM-3787) Migrate Fn API to be bidirectional
instruction/request stream
[ https://issues.apache.org/jira/browse/BEAM-3787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Cwik updated BEAM-3787:
----------------------------
Labels: portability (was: )
> Migrate Fn API to be bidirectional instruction/request stream
> -------------------------------------------------------------
>
> Key: BEAM-3787
> URL: https://issues.apache.org/jira/browse/BEAM-3787
> Project: Beam
> Issue Type: Improvement
> Components: beam-model
> Reporter: Luke Cwik
> Assignee: Robert Bradshaw
> Priority: Major
> Labels: portability
>
> Allow the SDK to request the Runner to do something on its behalf. This mechanism can be used for:
> * Reporting final counters
> * Work shedding (SDK can choose to reduce the amount of work it wants to do (checkpointing))
> * Requesting process bundle descriptors (instead of requiring the Runner to send them and have the SDK cache them).
> * Decoupling the message type in control allows for new types of messages to be added which are not one to one.
> Example API change below (note that SdkMessage/RunnerMessage should use a different name):
> {code:java}
> // An API that describes control messages between the SDK and Runner to process
> // bundles, split bundles, report progress, ...
> service BeamFnControl {
> //
> rpc Control(
> // A stream of SDK requests/responses.
> stream SdkMessage
> ) returns (
> // A stream of Runner requests/responses.
> stream RunnerMessage
> ) {}
> }
> // Messages a Runner can send over the control plane.
> message RunnerMessage {
> // (Required) An unique identifier provided by the runner which represents
> // this requests execution. The RunnerInstructionResponse MUST have the matching id.
> string id = 1;
> oneof message {
> ErrorResponse error = 999;
> RegisterRequest register = 1000;
> ProcessBundleRequest process_bundle = 1001;
> ProcessBundleProgressRequest process_bundle_progress = 1002;
> ProcessBundleSplitRequest process_bundle_split = 1003;
> ShedBundleResponse shed_bundle = 1000;
> }
> }
> // Messages an SDK can send over the control plane.
> message SdkMessage {
> oneof message {
> RunnerInstructionResponse runner_instruction_response = 1000;
> SdkInstructionRequest sdk_instruction_request = 1001;
> }
> }
> // A request sent by a runner which the SDK is asked to fulfill.
> // For any unsupported request type, an error should be returned with a
> // matching instruction id.
> // Stable
> message RunnerInstructionRequest {
> // (Required) An unique identifier provided by the runner which represents
> // this requests execution. The RunnerInstructionResponse MUST have the matching id.
> string instruction_id = 1;
> // (Required) A request that the SDK Harness needs to interpret.
> oneof request {
> RegisterRequest register = 1000;
> ProcessBundleRequest process_bundle = 1001;
> ProcessBundleProgressRequest process_bundle_progress = 1002;
> ProcessBundleSplitRequest process_bundle_split = 1003;
> }
> }
> // The response for an associated request the SDK had been asked to fulfill.
> // Stable
> message RunnerInstructionResponse {
> // (Required) A reference provided by the runner which represents a requests
> // execution. The RunnerInstructionResponse MUST have the matching id when
> // responding to the runner.
> string instruction_id = 1;
> // If this is specified, then this instruction has failed.
> // A human readable string representing the reason as to why processing has
> // failed.
> string error = 2;
> // If the instruction did not fail, it is required to return an equivalent
> // response type depending on the request this matches.
> oneof response {
> RegisterResponse register = 1000;
> ProcessBundleResponse process_bundle = 1001;
> ProcessBundleProgressResponse process_bundle_progress = 1002;
> ProcessBundleSplitResponse process_bundle_split = 1003;
> }
> }
> message SdkInstructionRequest {
> // (Required) An unique identifier provided by the SDK which represents
> // this requests execution. The SdkInstructionResponse MUST have the matching id.
> string instruction_id = 1;
> // (Required) A request that the Runner needs to interpret.
> oneof request {
> ShedBundleRequest shed_bundle = 1000;
> }
> }
> // The response for an associated request the Runner had been asked to fulfill.
> // Stable
> message RunnerInstructionResponse {
> // (Required) A reference provided by the SDK which represents a requests
> // execution. The RunnerInstructionResponse MUST have the matching id when
> // responding to the SDK.
> string instruction_id = 1;
> // If this is specified, then this instruction has failed.
> // A human readable string representing the reason as to why processing has
> // failed.
> string error = 2;
> // If the instruction did not fail, it is required to return an equivalent
> // response type depending on the request this matches.
> oneof response {
> ShedBundleResponse shed_bundle = 1000;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)