You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/19 17:11:51 UTC
[1/2] beam git commit: [BEAM-1348] Model the Fn State Api as per
https://s.apache.org/beam-fn-state-api-and-bundle-processing
Repository: beam
Updated Branches:
refs/heads/master 0cabdf6e7 -> 2ab482d2b
[BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d7715b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d7715b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d7715b78
Branch: refs/heads/master
Commit: d7715b78a1f57e6088aabdbc6979cb5809269a97
Parents: 0cabdf6
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 31 08:54:16 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 19 10:11:19 2017 -0700
----------------------------------------------------------------------
.../fn-api/src/main/proto/beam_fn_api.proto | 97 +++++++++++---------
1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d7715b78/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 95fe042..8162bc5 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -314,9 +314,9 @@ message ProcessBundleRequest {
// instantiated and executed by the SDK harness.
string process_bundle_descriptor_reference = 1;
- // (Optional) A list of cache tokens that can be used by an SDK to cache
- // data looked up using the State API across multiple bundles.
- repeated CacheToken cache_tokens = 2;
+ // (Optional) A list of cache tokens that can be used by an SDK to reuse
+ // cached data returned by the State API across multiple bundles.
+ repeated bytes cache_tokens = 2;
}
// Stable
@@ -539,6 +539,10 @@ message StateResponse {
// failed.
string error = 2;
+ // (Optional) If this is specified, then the result of this state request
+ // can be cached using the supplied token.
+ bytes cache_token = 3;
+
// A corresponding response matching the request will be populated.
oneof response {
// A response to getting state.
@@ -564,49 +568,44 @@ service BeamFnState {
) {}
}
-message CacheToken {
- // (Required) Represents the function spec and tag associated with this state
- // key.
- //
- // By combining the function_spec_reference with the tag representing:
- // * the input, we refer to the iterable portion of a large GBK
- // * the side input, we refer to the side input
- // * the user state, we refer to user state
- Target target = 1;
-
- // (Required) An opaque identifier.
- bytes token = 2;
-}
-
message StateKey {
- // (Required) Represents the function spec and tag associated with this state
- // key.
- //
- // By combining the function_spec_reference with the tag representing:
- // * the input, we refer to fetching the iterable portion of a large GBK
- // * the side input, we refer to fetching the side input
- // * the user state, we refer to fetching user state
- Target target = 1;
-
- // (Required) The bytes of the window which this state request is for encoded
- // in the nested context.
- bytes window = 2;
+ message Runner {
+ // (Required) Opaque information supplied by the runner. Used to support
+ // remote references.
+ bytes key = 1;
+ }
- // (Required) The user key encoded in the nested context.
- bytes key = 3;
-}
+ message MultimapSideInput {
+ // (Required) The id of the PTransform containing a side input.
+ string ptransform_id = 1;
+ // (Required) The id of the side input.
+ string side_input_id = 2;
+ // (Required) The window (after mapping the currently executing elements
+ // window into the side input windows domain) encoded in a nested context.
+ bytes window = 3;
+ // (Required) The key encoded in a nested context.
+ bytes key = 4;
+ }
-// A logical byte stream which can be continued using the state API.
-message ContinuableStream {
- // (Optional) If specified, represents a token which can be used with the
- // state API to get the next chunk of this logical byte stream. The end of
- // the logical byte stream is signalled by this field being unset.
- bytes continuation_token = 1;
+ message BagUserState {
+ // (Required) The id of the PTransform containing user state.
+ string ptransform_id = 1;
+ // (Required) The id of the user state.
+ string user_state_id = 2;
+ // (Required) The window encoded in a nested context.
+ bytes window = 3;
+ // (Required) The key of the currently executing element encoded in a
+ // nested context.
+ bytes key = 4;
+ }
- // Represents a part of a logical byte stream. Elements within
- // the logical byte stream are encoded in the nested context and
- // concatenated together.
- bytes data = 2;
+ // (Required) One of the following state keys must be set.
+ oneof type {
+ Runner runner = 1;
+ MultimapSideInput multimap_side_input = 2;
+ BagUserState bag_user_state = 3;
+ // TODO: represent a state key for user map state
+ }
}
// A request to get state.
@@ -619,10 +618,18 @@ message StateGetRequest {
bytes continuation_token = 1;
}
-// A response to get state.
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
message StateGetResponse {
- // (Required) The response containing a continuable logical byte stream.
- ContinuableStream stream = 1;
+ // (Optional) If specified, represents a token which can be used with the
+ // state API to get the next chunk of this logical byte stream. The end of
+ // the logical byte stream is signalled by this field being unset.
+ bytes continuation_token = 1;
+
+ // Represents a part of a logical byte stream. Elements within
+ // the logical byte stream are encoded in the nested context and
+ // concatenated together.
+ bytes data = 2;
}
// A request to append state.
[2/2] beam git commit: [BEAM-1348] Model the Fn State Api as per
https://s.apache.org/beam-fn-state-api-and-bundle-processing
Posted by lc...@apache.org.
[BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing
This closes #3268
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ab482d2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ab482d2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ab482d2
Branch: refs/heads/master
Commit: 2ab482d2bab51abac8c5c75a5da0b6cad98e5a77
Parents: 0cabdf6 d7715b7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 19 10:11:42 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 19 10:11:42 2017 -0700
----------------------------------------------------------------------
.../fn-api/src/main/proto/beam_fn_api.proto | 97 +++++++++++---------
1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------