You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/19 02:02:38 UTC

[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080737501


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   This is an interesting idea that I need to think about a bit more. The one outcome from separating elements in this way is that it forces mutually exclusive element types. The consequence being is that you can no longer have both a slow and and an error element. It's not impossible, but I think the categories would have to be broader than proposed, e.g. SuccessfulElement (meaning an element that processed normally) and ExceptionalElement (encompassing any non-normal processing).
   
   Another approach is to favor a "composition over inheritance". For example, have an element with proto extensions. Then we can have a single element with multiple attributes (slow, error, etc.) that can be queried.
   
   Another option is a hybrid of both, have two maps for normal and abnormal processing where an element can be queried for extensions.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;
+
+    // FUTURE WORK: Capture lull detections and exceptions.
+    //
+    // Optional. Present if there was an exception
+    // processing the above element.
+    //
+    // LogEntry exception_entry = 2;
+  }
+
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated Element elements = 1;
+  }
+
+  // Required. Map from PCollection id to list of encoded elements using the
+  // associated Ccoder id for that PCollection.
+  map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+  // Required. Map from ProcessBundleDescriptor id to samples using that
+  // descriptor.
+  map<string, Samples> descriptors = 1;

Review Comment:
   Yeah, that's a good simplification. In the future, will SDKs be able to execute bundles from different pipelines? I'm just worried that not specifying the descriptor too may introduce unwanted behavior if that's the case.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -109,6 +109,7 @@ message InstructionRequest {
     FinalizeBundleRequest finalize_bundle = 1004;
     MonitoringInfosMetadataRequest monitoring_infos = 1005;
     HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
+    SampleRequest sample = 1007;

Review Comment:
   sgtm, replaced with SampleDataRequest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org