You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/19 17:11:51 UTC

[1/2] beam git commit: [BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing

Repository: beam
Updated Branches:
  refs/heads/master 0cabdf6e7 -> 2ab482d2b


[BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d7715b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d7715b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d7715b78

Branch: refs/heads/master
Commit: d7715b78a1f57e6088aabdbc6979cb5809269a97
Parents: 0cabdf6
Author: Luke Cwik <lc...@google.com>
Authored: Wed May 31 08:54:16 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 19 10:11:19 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 97 +++++++++++---------
 1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d7715b78/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
index 95fe042..8162bc5 100644
--- a/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
+++ b/sdks/common/fn-api/src/main/proto/beam_fn_api.proto
@@ -314,9 +314,9 @@ message ProcessBundleRequest {
   // instantiated and executed by the SDK harness.
   string process_bundle_descriptor_reference = 1;
 
-  // (Optional) A list of cache tokens that can be used by an SDK to cache
-  // data looked up using the State API across multiple bundles.
-  repeated CacheToken cache_tokens = 2;
+  // (Optional) A list of cache tokens that can be used by an SDK to reuse
+  // cached data returned by the State API across multiple bundles.
+  repeated bytes cache_tokens = 2;
 }
 
 // Stable
@@ -539,6 +539,10 @@ message StateResponse {
   // failed.
   string error = 2;
 
+  // (Optional) If this is specified, then the result of this state request
+  // can be cached using the supplied token.
+  bytes cache_token = 3;
+
   // A corresponding response matching the request will be populated.
   oneof response {
     // A response to getting state.
@@ -564,49 +568,44 @@ service BeamFnState {
   ) {}
 }
 
-message CacheToken {
-  // (Required) Represents the function spec and tag associated with this state
-  // key.
-  //
-  // By combining the function_spec_reference with the tag representing:
-  //   * the input, we refer to the iterable portion of a large GBK
-  //   * the side input, we refer to the side input
-  //   * the user state, we refer to user state
-  Target target = 1;
-
-  // (Required) An opaque identifier.
-  bytes token = 2;
-}
-
 message StateKey {
-  // (Required) Represents the function spec and tag associated with this state
-  // key.
-  //
-  // By combining the function_spec_reference with the tag representing:
-  //   * the input, we refer to fetching the iterable portion of a large GBK
-  //   * the side input, we refer to fetching the side input
-  //   * the user state, we refer to fetching user state
-  Target target = 1;
-
-  // (Required) The bytes of the window which this state request is for encoded
-  // in the nested context.
-  bytes window = 2;
+  message Runner {
+    // (Required) Opaque information supplied by the runner. Used to support
+    // remote references.
+    bytes key = 1;
+  }
 
-  // (Required) The user key encoded in the nested context.
-  bytes key = 3;
-}
+  message MultimapSideInput {
+    // (Required) The id of the PTransform containing a side input.
+    string ptransform_id = 1;
+    // (Required) The id of the side input.
+    string side_input_id = 2;
+    // (Required) The window (after mapping the currently executing elements
+    // window into the side input windows domain) encoded in a nested context.
+    bytes window = 3;
+    // (Required) The key encoded in a nested context.
+    bytes key = 4;
+  }
 
-// A logical byte stream which can be continued using the state API.
-message ContinuableStream {
-  // (Optional) If specified, represents a token which can be used with the
-  // state API to get the next chunk of this logical byte stream. The end of
-  // the logical byte stream is signalled by this field being unset.
-  bytes continuation_token = 1;
+  message BagUserState {
+    // (Required) The id of the PTransform containing user state.
+    string ptransform_id = 1;
+    // (Required) The id of the user state.
+    string user_state_id = 2;
+    // (Required) The window encoded in a nested context. 
+    bytes window = 3;
+    // (Required) The key of the currently executing element encoded in a
+    // nested context.
+    bytes key = 4;
+  }
 
-  // Represents a part of a logical byte stream. Elements within
-  // the logical byte stream are encoded in the nested context and
-  // concatenated together.
-  bytes data = 2;
+  // (Required) One of the following state keys must be set.
+  oneof type {
+    Runner runner = 1;
+    MultimapSideInput multimap_side_input = 2;
+    BagUserState bag_user_state = 3;
+    // TODO: represent a state key for user map state
+  }
 }
 
 // A request to get state.
@@ -619,10 +618,18 @@ message StateGetRequest {
   bytes continuation_token = 1;
 }
 
-// A response to get state.
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
 message StateGetResponse {
-  // (Required) The response containing a continuable logical byte stream.
-  ContinuableStream stream = 1;
+  // (Optional) If specified, represents a token which can be used with the
+  // state API to get the next chunk of this logical byte stream. The end of
+  // the logical byte stream is signalled by this field being unset.
+  bytes continuation_token = 1;
+
+  // Represents a part of a logical byte stream. Elements within
+  // the logical byte stream are encoded in the nested context and
+  // concatenated together.
+  bytes data = 2;
 }
 
 // A request to append state.


[2/2] beam git commit: [BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing

Posted by lc...@apache.org.
[BEAM-1348] Model the Fn State Api as per https://s.apache.org/beam-fn-state-api-and-bundle-processing

This closes #3268


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ab482d2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ab482d2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ab482d2

Branch: refs/heads/master
Commit: 2ab482d2bab51abac8c5c75a5da0b6cad98e5a77
Parents: 0cabdf6 d7715b7
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 19 10:11:42 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 19 10:11:42 2017 -0700

----------------------------------------------------------------------
 .../fn-api/src/main/proto/beam_fn_api.proto     | 97 +++++++++++---------
 1 file changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------