You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/06 20:26:54 UTC
[beam] branch master updated: [BEAM-3221] Improve documentation around split request and response (#17726)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f893f6ef86c [BEAM-3221] Improve documentation around split request and response (#17726)
f893f6ef86c is described below
commit f893f6ef86c365aea671ab8fb88909573d95f474
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Jul 6 13:26:47 2022 -0700
[BEAM-3221] Improve documentation around split request and response (#17726)
* [BEAM-3221] Improve documentation around split request and response
Give an example and provide properties that must be held within a split and across multiple splits within the same bundle.
---
.../beam/model/fn_execution/v1/beam_fn_api.proto | 108 ++++++++++++++++-----
1 file changed, 84 insertions(+), 24 deletions(-)
diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
index d499941e75d..3753fb768f8 100644
--- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto
@@ -324,6 +324,9 @@ message ProcessBundleResponse {
// following applications need to be scheduled and executed in the future.
// A runner that does not yet support residual roots MUST still check that
// this is empty for correctness.
+ //
+ // Note that these residual roots must not have been returned as part of a
+ // prior split for this bundle.
repeated DelayedBundleApplication residual_roots = 2;
// DEPRECATED (Required) The list of metrics or other MonitoredState
@@ -438,42 +441,83 @@ message ProcessBundleSplitRequest {
// possible and returning the remainder).
double fraction_of_remainder = 1;
- // A set of allowed element indices where the SDK may split. When this is
- // empty, there are no constraints on where to split.
+ // (Optional) A set of allowed element indices where the SDK may split. When
+ // this is empty, there are no constraints on where to split.
repeated int64 allowed_split_points = 3;
- // (Required for GrpcRead operations) Number of total elements expected
- // to be sent to this GrpcRead operation, required to correctly account
- // for unreceived data when determining where to split.
+ // (Required for gRPC Read operation transforms) Number of total elements
+ // expected to be sent to this GrpcRead operation, required to correctly
+ // account for unreceived data when determining where to split.
int64 estimated_input_elements = 2;
-
- // TODO(SDF): Allow providing weights rather than sizes.
}
// (Required) Specifies the desired split for each transform.
//
- // Currently only splits at GRPC read operations are supported.
+ // Currently only splits at gRPC read operations are supported.
// This may, of course, limit the amount of work downstream operations
// receive.
map<string, DesiredSplit> desired_splits = 3;
}
-// Represents a partition of the bundle: a "primary" and
-// a "residual", with the following properties:
+
+// Represents a partition of the bundle: a "primary" and a "residual", with the
+// following properties:
// - The work in primary and residual doesn't overlap, and combined, adds up
// to the work in the current bundle if the split hadn't happened.
-// - The current bundle, if it keeps executing, will have done none of the
-// work under residual_roots.
+// - The current bundle, if it keeps executing, will have done exactly none of
+// the work under residual_roots and none of the elements at and beyond the
+// first_residual_element.
// - The current bundle, if no further splits happen, will have done exactly
-// the work under primary_roots.
+// the work under primary_roots and all elements up to and including the
+// channel splits last_primary_element.
+//
+// This allows the SDK to relinquish ownership of and commit to not process some
+// of the elements that it may have been sent (the residual) while retaining
+// ownership and commitment to finish the other portion (the primary).
+//
+// For example, lets say the SDK is processing elements A B C D E and a split
+// request comes in. The SDK could return a response with a channel split
+// representing a last_primary_element of 3 (D) and first_residual_element of 4
+// (E). The SDK is now responsible for processing A B C D and the runner must
+// process E in the future. A future split request could have the SDK split the
+// elements B into B1 and B2 and C into C1 and C2 representing their primary and
+// residual roots. The SDK would return a response with a channel split
+// representing a last_primary_element of 0 (A) and first_residual_element of 3
+// (D) with primary_roots (B1, C1) and residual_roots (B2, C2). The SDK is now
+// responsible for processing A B1 C1 and the runner must process C2 D2 (and E
+// from the prior split) in the future. Yet another future split request could
+// have the SDK could split B1 further into B1a and B1b primary and residuals
+// and return C2 as a residual (assuming C2 was left unprocessed). The SDK would
+// return a response with a channel split representing a last_primary_element of
+// 0 (A) and first_residual_element of 4 (E) with primary_roots (B1a) and
+// residual_roots (B1b, C1). The SDK is now responsible for processing A B1a the
+// runner must process B1b C1 (in addition to C2, D, E from prior splits) in the
+// future.
+//
// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
message ProcessBundleSplitResponse {
- // Root applications that should replace the current bundle.
+ // (Optional) Root applications that should replace the current bundle.
+ //
+ // Note that primary roots can only be specified if a channel split's
+ // last_primary_element + 1 < first_residual_element
+ //
+ // Note that there must be a corresponding residual root contained within
+ // residual_roots representing the remainder of processing for the original
+ // element this this primary root represents a fraction of.
repeated BundleApplication primary_roots = 1;
- // Root applications that have been removed from the current bundle and
+ // (Optional) Root applications that have been removed from the current bundle and
// have to be executed in a separate bundle (e.g. in parallel on a different
// worker, or after the current bundle completes, etc.)
+ //
+ // Note that residual roots can only be specified if a channel split's
+ // last_primary_element + 1 < first_residual_element
+ //
+ // Note that there must be a corresponding primary root contained within
+ // primary_roots representing the remainder of processing for the original
+ // element this this residual root represents a fraction of.
+ //
+ // Note that subsequent splits must not return prior residual roots.
repeated DelayedBundleApplication residual_roots = 2;
// Represents contiguous portions of the data channel that are either
@@ -484,24 +528,40 @@ message ProcessBundleSplitResponse {
// (if the bundle is large) and often a more efficient representation
// on the runner side (e.g. if the set of elements can be represented
// as some range in an underlying dataset).
+ //
+ // Note that for a split the following properties must hold:
+ // - last_primary_element < first_residual_element
+ // - primary roots and residual roots can only be specified if the
+ // last_primary_element + 1 < first_residual_element
+ // (typically there is one primary and residual root per element in the
+ // range (last_primary_element, first_residual_element))
+ // - primary roots and residual roots must represent a disjoint but full
+ // coverage of work represented by the elements between last_primary_element
+ // and first_residual_element
+ //
+ // Note that subsequent splits of the same bundle must ensure that:
+ // - the first_residual_element does not increase
+ // - the first_residual_element does not decrease if there were residual
+ // or primary roots returned in a prior split.
message ChannelSplit {
// (Required) The grpc read transform reading this channel.
string transform_id = 1;
- // The last element of the input channel that should be entirely considered
- // part of the primary, identified by its absolute index in the (ordered)
- // channel.
+ // (Required) The last element of the input channel that should be entirely
+ // considered part of the primary, identified by its absolute zero-based
+ // index in the (ordered) channel.
int64 last_primary_element = 2;
- // The first element of the input channel that should be entirely considered
- // part of the residual, identified by its absolute index in the (ordered)
- // channel.
+ // (Required) The first element of the input channel that should be entirely
+ // considered part of the residual, identified by its absolute zero-based
+ // index in the (ordered) channel.
int64 first_residual_element = 3;
}
- // Partitions of input data channels into primary and residual elements,
- // if any. Should not include any elements represented in the bundle
- // applications roots above.
+ // (Required) Partitions of input data channels into primary and residual
+ // elements, if any. Must not include any elements represented in the bundle
+ // applications roots above of the current split or any prior split of the
+ // same bundle.
repeated ChannelSplit channel_splits = 3;
}