You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 20:26:54 UTC

[beam] branch master updated: [BEAM-3221] Improve documentation around split request and response (#17726)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f893f6ef86c [BEAM-3221] Improve documentation around split request and response (#17726)
f893f6ef86c is described below

commit f893f6ef86c365aea671ab8fb88909573d95f474
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Jul 6 13:26:47 2022 -0700

    [BEAM-3221] Improve documentation around split request and response (#17726)
    
    * [BEAM-3221] Improve documentation around split request and response
    
    Give an example and provide properties that must be held within a split and across multiple splits within the same bundle.
---
 .../beam/model/fn_execution/v1/beam_fn_api.proto   | 108 ++++++++++++++++-----
 1 file changed, 84 insertions(+), 24 deletions(-)

diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
index d499941e75d..3753fb768f8 100644
--- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
@@ -324,6 +324,9 @@ message ProcessBundleResponse {
   // following applications need to be scheduled and executed in the future.
   // A runner that does not yet support residual roots MUST still check that
   // this is empty for correctness.
+  //
+  // Note that these residual roots must not have been returned as part of a
+  // prior split for this bundle.
   repeated DelayedBundleApplication residual_roots = 2;
 
   // DEPRECATED (Required) The list of metrics or other MonitoredState
@@ -438,42 +441,83 @@ message ProcessBundleSplitRequest {
     // possible and returning the remainder).
     double fraction_of_remainder = 1;
 
-    // A set of allowed element indices where the SDK may split. When this is
-    // empty, there are no constraints on where to split.
+    // (Optional) A set of allowed element indices where the SDK may split. When
+    // this is empty, there are no constraints on where to split.
     repeated int64 allowed_split_points = 3;
 
-    // (Required for GrpcRead operations) Number of total elements expected
-    // to be sent to this GrpcRead operation, required to correctly account
-    // for unreceived data when determining where to split.
+    // (Required for gRPC Read operation transforms) Number of total elements
+    // expected to be sent to this GrpcRead operation, required to correctly
+    // account for unreceived data when determining where to split.
     int64 estimated_input_elements = 2;
-
-    // TODO(SDF): Allow providing weights rather than sizes.
   }
 
   // (Required) Specifies the desired split for each transform.
   //
-  // Currently only splits at GRPC read operations are supported.
+  // Currently only splits at gRPC read operations are supported.
   // This may, of course, limit the amount of work downstream operations
   // receive.
   map<string, DesiredSplit> desired_splits = 3;
 }
 
-// Represents a partition of the bundle: a "primary" and
-// a "residual", with the following properties:
+
+// Represents a partition of the bundle: a "primary" and a "residual", with the
+// following properties:
 // - The work in primary and residual doesn't overlap, and combined, adds up
 //   to the work in the current bundle if the split hadn't happened.
-// - The current bundle, if it keeps executing, will have done none of the
-//   work under residual_roots.
+// - The current bundle, if it keeps executing, will have done exactly none of
+//   the work under residual_roots and none of the elements at and beyond the
+//   first_residual_element.
 // - The current bundle, if no further splits happen, will have done exactly
-//   the work under primary_roots.
+//   the work under primary_roots and all elements up to and including the
+//   channel splits last_primary_element.
+//
+// This allows the SDK to relinquish ownership of and commit to not process some
+// of the elements that it may have been sent (the residual) while retaining
+// ownership and commitment to finish the other portion (the primary).
+//
+// For example, lets say the SDK is processing elements A B C D E and a split
+// request comes in. The SDK could return a response with a channel split
+// representing a last_primary_element of 3 (D) and first_residual_element of 4
+// (E). The SDK is now responsible for processing A B C D and the runner must
+// process E in the future. A future split request could have the SDK split the
+// elements B into B1 and B2 and C into C1 and C2 representing their primary and
+// residual roots. The SDK would return a response with a channel split
+// representing a last_primary_element of 0 (A) and first_residual_element of 3
+// (D) with primary_roots (B1, C1) and residual_roots (B2, C2). The SDK is now
+// responsible for processing A B1 C1 and the runner must process C2 D2 (and E
+// from the prior split) in the future. Yet another future split request could
+// have the SDK could split B1 further into B1a and B1b primary and residuals
+// and return C2 as a residual (assuming C2 was left unprocessed). The SDK would
+// return a response with a channel split representing a last_primary_element of
+// 0 (A) and first_residual_element of 4 (E) with primary_roots (B1a) and
+// residual_roots (B1b, C1). The SDK is now responsible for processing A B1a the
+// runner must process B1b C1 (in addition to C2, D, E from prior splits) in the
+// future.
+//
 // For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
 message ProcessBundleSplitResponse {
-  // Root applications that should replace the current bundle.
+  // (Optional) Root applications that should replace the current bundle.
+  //
+  // Note that primary roots can only be specified if a channel split's
+  // last_primary_element + 1 < first_residual_element
+  //
+  // Note that there must be a corresponding residual root contained within
+  // residual_roots representing the remainder of processing for the original
+  // element this this primary root represents a fraction of.
   repeated BundleApplication primary_roots = 1;
 
-  // Root applications that have been removed from the current bundle and
+  // (Optional) Root applications that have been removed from the current bundle and
   // have to be executed in a separate bundle (e.g. in parallel on a different
   // worker, or after the current bundle completes, etc.)
+  //
+  // Note that residual roots can only be specified if a channel split's
+  // last_primary_element + 1 < first_residual_element
+  //
+  // Note that there must be a corresponding primary root contained within
+  // primary_roots representing the remainder of processing for the original
+  // element this this residual root represents a fraction of.
+  //
+  // Note that subsequent splits must not return prior residual roots.
   repeated DelayedBundleApplication residual_roots = 2;
 
   // Represents contiguous portions of the data channel that are either
@@ -484,24 +528,40 @@ message ProcessBundleSplitResponse {
   // (if the bundle is large) and often a more efficient representation
   // on the runner side (e.g. if the set of elements can be represented
   // as some range in an underlying dataset).
+  //
+  // Note that for a split the following properties must hold:
+  // - last_primary_element < first_residual_element
+  // - primary roots and residual roots can only be specified if the
+  //   last_primary_element + 1 < first_residual_element
+  //   (typically there is one primary and residual root per element in the
+  //   range (last_primary_element, first_residual_element))
+  // - primary roots and residual roots must represent a disjoint but full
+  //   coverage of work represented by the elements between last_primary_element
+  //   and first_residual_element
+  //
+  // Note that subsequent splits of the same bundle must ensure that:
+  // - the first_residual_element does not increase
+  // - the first_residual_element does not decrease if there were residual
+  //   or primary roots returned in a prior split.
   message ChannelSplit {
     // (Required) The grpc read transform reading this channel.
     string transform_id = 1;
 
-    // The last element of the input channel that should be entirely considered
-    // part of the primary, identified by its absolute index in the (ordered)
-    // channel.
+    // (Required) The last element of the input channel that should be entirely
+    // considered part of the primary, identified by its absolute zero-based
+    // index in the (ordered) channel.
     int64 last_primary_element = 2;
 
-    // The first element of the input channel that should be entirely considered
-    // part of the residual, identified by its absolute index in the (ordered)
-    // channel.
+    // (Required) The first element of the input channel that should be entirely
+    // considered part of the residual, identified by its absolute zero-based
+    // index in the (ordered) channel.
     int64 first_residual_element = 3;
   }
 
-  // Partitions of input data channels into primary and residual elements,
-  // if any. Should not include any elements represented in the bundle
-  // applications roots above.
+  // (Required) Partitions of input data channels into primary and residual
+  // elements, if any. Must not include any elements represented in the bundle
+  // applications roots above of the current split or any prior split of the
+  // same bundle.
   repeated ChannelSplit channel_splits = 3;
 }