You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/18 19:48:07 UTC

[GitHub] [beam] rohdesamuel opened a new pull request, #25065: #25064: Data Sampling proto changes

rohdesamuel opened a new pull request, #25065:
URL: https://github.com/apache/beam/pull/25065

   This adds:
   
   - The Sample instruction (SampleRequest and SampleResponse) to query the samples from an SDK
   - The "beam:protocol:data_sampling:v1" capability to track which SDKs can sample
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1084624554


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto

Review Comment:
   ```suggestion
   // An element sampled when the SDK is processing a bundle. This is a proto
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleDataResponse {
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated SampledElement elements = 1;
+  }
+
+  // Map from PCollection id to sample elements.
+  map<string, ElementList> element_samples = 1;
+
+  // FUTURE WORK: Investigate ways of storing multiple interesting types of
+  // sampled elements. There are two ways of accomplishing this:
+  // 1) Maps of typed elements: include multiple maps here with typed element
+  // proto messages, ex.
+  //
+  // message SlowElement {...}
+  // message ErroredElement {...}
+  // map<string, SlowElement> slow_elements
+  // map<string, ErroredElement> errored_elements
+  //
+  // However, this forces an element into a single category. It disallows
+  // classification across multiple characteristics (like a slow and errored
+  // element).
+  //
+  // 2) Compositional types: allow for Protobuf Extensions on the base

Review Comment:
   proto3 discourages extensions and prefers people use Any which is a poor version of using URN + payload.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.

Review Comment:
   ```suggestion
   // `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list will
   // match everything.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.

Review Comment:
   ```suggestion
   // If supported, the `SampleDataResponse` will contain samples from PCollections
   // based upon the filters specified in the request.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can

Review Comment:
   Are you use you want AND here vs OR?
   
   What kinds of lookups do you think runners will ask for?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -1638,6 +1638,14 @@ message StandardProtocols {
     // SDKs ability to store the data in memory reducing the amount of memory
     // used overall.
     STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"];
+
+    // Indicates that this SDK can sample in-flight elements. These samples can
+    // then be queried using the SampleRequest. Samples are uniquely associated
+    // with (ProcessBundleDescriptor::id, PCollection::id). Meaning, samples
+    // are taken for each PCollection in every ProcessBundleDescriptor. This
+    // is disabled by default and enabled with the enable_data_sampling
+    // experiment.

Review Comment:
   ```suggestion
       // Indicates that this SDK can sample in-flight elements. These samples can
       // then be queried using the SampleDataRequest. Samples are uniquely associated
       // with a PCollection. Meaning, samples are taken for each PCollection
       // during bundle processing. This is disabled by default and enabled with the
       // `enable_data_sampling` experiment.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080713738


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the

Review Comment:
   It's easiest when describing this if it's an AND or OR operation between the lists. Either can make sense.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;

Review Comment:
   Note that this field isn't strictly necessary since the pcollection ids are globally unique for the pipeline but could still be convenient if you want to create a select/filter expression over what data you want returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1085723946


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can

Review Comment:
   That sounds like you want to use an `OR` clause since `All samples from a particular PCollection (regardless of PBD)` and `Samples from a set of PCollections in a PBD` are the same when the PCollection ids are unique.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1403964973

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1081663583


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the

Review Comment:
   Ack clarified to be an AND operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1085726016


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,78 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. Samples are taken from all
+// given ProcessBundleDescriptor ids and filtered on PCollection ids. An empty
+// list will match everything.

Review Comment:
   ```suggestion
   // If supported, the `SampleDataRequest` will respond with a
   // `SampleDataResponse`. The SDK being queried must have the
   // "beam:protocol:data_sampling:v1" capability. Samples are taken from all
   // specified ProcessBundleDescriptor ids and all specified PCollection ids (equivalent to OR clause). An empty
   // list will match everything.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,78 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. Samples are taken from all
+// given ProcessBundleDescriptor ids and filtered on PCollection ids. An empty
+// list will match everything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// An element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from PCollections
+// based upon the filters specified in the request.
+message SampleDataResponse {
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated SampledElement elements = 1;
+  }
+
+  // Map from PCollection id to sample elements.
+  map<string, ElementList> element_samples = 1;
+
+  // FUTURE WORK: Investigate ways of storing multiple interesting types of
+  // sampled elements. There are two ways of accomplishing this:
+  // 1) Maps of typed elements: include multiple maps here with typed element
+  // proto messages, ex.
+  //
+  // message SlowElement {...}
+  // message ErroredElement {...}
+  // map<string, SlowElement> slow_elements
+  // map<string, ErroredElement> errored_elements
+  //
+  // However, this forces an element into a single category. It disallows
+  // classification across multiple characteristics (like a slow and errored
+  // element).
+  //
+  // 2) Compositional types: allow for URN and payloads on the base
+  // SampledElement message for interesting characteristics. The base class
+  // can then be queried for specific URNs. This allows for multiple
+  // attributes on the same element.
+  //
+  // message SlowElement {...}
+  // extend SampledElement { optional SlowElement slow_element = ... }

Review Comment:
   ```suggestion
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,78 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. Samples are taken from all
+// given ProcessBundleDescriptor ids and filtered on PCollection ids. An empty
+// list will match everything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// An element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from PCollections
+// based upon the filters specified in the request.
+message SampleDataResponse {
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated SampledElement elements = 1;
+  }
+
+  // Map from PCollection id to sample elements.

Review Comment:
   ```suggestion
     // Map from PCollection id to sampled elements.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080737501


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   This is an interesting idea that I need to think about a bit more. The one outcome from separating elements in this way is that it forces mutually exclusive element types. The consequence being is that you can no longer have both a slow and and an error element. It's not impossible, but I think the categories would have to be broader than proposed, e.g. SuccessfulElement (meaning an element that processed normally) and ExceptionalElement (encompassing any non-normal processing).
   
   Another approach is to favor a "composition over inheritance". For example, have an element with proto extensions. Then we can have a single element with multiple attributes (slow, error, etc.) that can be queried.
   
   Another option is a hybrid of both, have two maps for normal and abnormal processing where an element can be queried for extensions.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;
+
+    // FUTURE WORK: Capture lull detections and exceptions.
+    //
+    // Optional. Present if there was an exception
+    // processing the above element.
+    //
+    // LogEntry exception_entry = 2;
+  }
+
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated Element elements = 1;
+  }
+
+  // Required. Map from PCollection id to list of encoded elements using the
+  // associated Ccoder id for that PCollection.
+  map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+  // Required. Map from ProcessBundleDescriptor id to samples using that
+  // descriptor.
+  map<string, Samples> descriptors = 1;

Review Comment:
   Yeah, that's a good simplification. In the future, will SDKs be able to execute bundles from different pipelines? I'm just worried that not specifying the descriptor too may introduce unwanted behavior if that's the case.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -109,6 +109,7 @@ message InstructionRequest {
     FinalizeBundleRequest finalize_bundle = 1004;
     MonitoringInfosMetadataRequest monitoring_infos = 1005;
     HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
+    SampleRequest sample = 1007;

Review Comment:
   sgtm, replaced with SampleDataRequest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1085673374


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleDataResponse {
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated SampledElement elements = 1;
+  }
+
+  // Map from PCollection id to sample elements.
+  map<string, ElementList> element_samples = 1;
+
+  // FUTURE WORK: Investigate ways of storing multiple interesting types of
+  // sampled elements. There are two ways of accomplishing this:
+  // 1) Maps of typed elements: include multiple maps here with typed element
+  // proto messages, ex.
+  //
+  // message SlowElement {...}
+  // message ErroredElement {...}
+  // map<string, SlowElement> slow_elements
+  // map<string, ErroredElement> errored_elements
+  //
+  // However, this forces an element into a single category. It disallows
+  // classification across multiple characteristics (like a slow and errored
+  // element).
+  //
+  // 2) Compositional types: allow for Protobuf Extensions on the base

Review Comment:
   Gotcha, didn't know that this was discouraged. Changed to URN + payload.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25065: Task #25064: Data Sampling proto changes

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1404117928

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1074014043


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -109,6 +109,7 @@ message InstructionRequest {
     FinalizeBundleRequest finalize_bundle = 1004;
     MonitoringInfosMetadataRequest monitoring_infos = 1005;
     HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
+    SampleRequest sample = 1007;

Review Comment:
   SampleRequest is super vague, what about something like SamplePCollectionRequest or SampleDataRequest?
   
   ditto for response



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;

Review Comment:
   Should this be a list as well?
   
   ```suggestion
     // (Optional) The ProcessBundleDescriptor ids to filter for.
     repeated string process_bundle_descriptor_ids = 1;
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.

Review Comment:
   ```suggestion
     // (Optional) The PCollection ids to filter for.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   I think it would make sense to have different lists for arbitrary samples, slow elements and elements that errored out instead of creating one giant element that has all the permutations.
   
   This way we can create a different element like like:
   SlowElementList that contains SlowElement
   ErroredElementList that contains ErorredElement
   
   and down in SampleResponse we would add maps that represent these as we get new use cases. You could have them commented out for now so that when someone works to expand this feature they will have our thought already laid out on what we expect from the data structure.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;
+
+    // FUTURE WORK: Capture lull detections and exceptions.
+    //
+    // Optional. Present if there was an exception
+    // processing the above element.
+    //
+    // LogEntry exception_entry = 2;
+  }
+
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated Element elements = 1;
+  }
+
+  // Required. Map from PCollection id to list of encoded elements using the
+  // associated Ccoder id for that PCollection.
+  map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+  // Required. Map from ProcessBundleDescriptor id to samples using that
+  // descriptor.
+  map<string, Samples> descriptors = 1;

Review Comment:
   Note that PCollection ids are meant to be globally unique even during execution so you should be able to do something like:
   ```suggestion
     // Map from PCollection id to sample elements.
     map<string, ElementList> element_samples = 1;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1081711060


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   Done!



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   And good to know, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1402789855

   Run Python_Transforms PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on pull request #25065: #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
rohdesamuel commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1387681367

   R: @pabloem 
   R: @lukecwik 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1086040684


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can

Review Comment:
   Sounds good! Used your suggested commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik merged pull request #25065: Task #25064: Data Sampling proto changes

Posted by "lukecwik (via GitHub)" <gi...@apache.org>.
lukecwik merged PR #25065:
URL: https://github.com/apache/beam/pull/25065


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080787349


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;
+
+    // FUTURE WORK: Capture lull detections and exceptions.
+    //
+    // Optional. Present if there was an exception
+    // processing the above element.
+    //
+    // LogEntry exception_entry = 2;
+  }
+
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated Element elements = 1;
+  }
+
+  // Required. Map from PCollection id to list of encoded elements using the
+  // associated Ccoder id for that PCollection.
+  map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+  // Required. Map from ProcessBundleDescriptor id to samples using that
+  // descriptor.
+  map<string, Samples> descriptors = 1;

Review Comment:
   Not in any planned future and likely so far out of scope that many of these concepts and designs would have to be modified.
   
   Note that the runner can always generate PCollection ids that are unique across pipelines as well if it were to come to that or we could evolve the API to add this field when needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080788307


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   Those are all true and we could figure out later as well. Might want to store that as a comment that we could either use different element types or use composition.
   
   This was the reason for us to use URNs and payloads in lots of places. If we do need that level of flexibility we can always create a data sampling v2 API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] rohdesamuel commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by "rohdesamuel (via GitHub)" <gi...@apache.org>.
rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1085700067


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can

Review Comment:
   Clarified in the comment that samples are returned based on PBD ids then filtered on PCollection ids. In this way, the runner can ask for:
   
   - All samples
   - All Samples from a particular PBD
   - All samples from a particular PCollection (regardless of PBD)
   - Samples from a set of PCollections in a PBD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25065: #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1387710543

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25065](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d2f4de6) into [master](https://codecov.io/gh/apache/beam/commit/1f02c23b538283e4621abfeaed2acd0d153cd582?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (1f02c23) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #25065   +/-   ##
   =======================================
     Coverage   73.12%   73.13%           
   =======================================
     Files         735      735           
     Lines       98147    98190   +43     
   =======================================
   + Hits        71770    71811   +41     
   - Misses      25013    25015    +2     
     Partials     1364     1364           
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `82.68% <ø> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.17% <0.00%> (-0.38%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/typehints.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL3R5cGVoaW50cy5weQ==) | `92.88% <0.00%> (-0.17%)` | :arrow_down: |
   | [...ache/beam/model/fn\_execution/v1/beam\_fn\_api\_pb2.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvYXBpL29yZy9hcGFjaGUvYmVhbS9tb2RlbC9mbl9leGVjdXRpb24vdjEvYmVhbV9mbl9hcGlfcGIyLnB5) | `100.00% <0.00%> (ø)` | |
   | [...ache/beam/model/pipeline/v1/beam\_runner\_api\_pb2.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvYXBpL29yZy9hcGFjaGUvYmVhbS9tb2RlbC9waXBlbGluZS92MS9iZWFtX3J1bm5lcl9hcGlfcGIyLnB5) | `100.00% <0.00%> (ø)` | |
   | [...beam/model/pipeline/v1/beam\_runner\_api\_pb2\_urns.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcG9ydGFiaWxpdHkvYXBpL29yZy9hcGFjaGUvYmVhbS9tb2RlbC9waXBlbGluZS92MS9iZWFtX3J1bm5lcl9hcGlfcGIyX3VybnMucHk=) | `100.00% <0.00%> (ø)` | |
   | [...dks/python/apache\_beam/options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vb3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `93.97% <0.00%> (+0.02%)` | :arrow_up: |
   | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `81.88% <0.00%> (+0.14%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/25065?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.24% <0.00%> (+0.16%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #25065:
URL: https://github.com/apache/beam/pull/25065#issuecomment-1396097655

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on a diff in pull request #25065: Task #25064: Data Sampling proto changes

Posted by GitBox <gi...@apache.org>.
lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080668824


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   I think it would make sense to have different lists for arbitrary samples, slow elements and elements that errored out instead of creating one giant element that has all the permutations.
   
   This way we can create a different element list like:
   SlowElement
   ErorredElement
   
   and down in ElementList these as we get new use cases. You could have them commented out for now so that when someone works to expand this feature they will have our thought already laid out on what we expect from the data structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org