You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/01/26 03:40:04 UTC

[beam] branch master updated: Add capabilities for the HarnessMonitoringInfosRequest/Response and set it on the python SDK environment

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ddf941  Add capabilities for the HarnessMonitoringInfosRequest/Response and set it on the python SDK environment
     new 5ef873f  Merge pull request #13801 from Add capabilities for the HarnessMonitoringInfosRequest/Response
3ddf941 is described below

commit 3ddf941d8de9c6809e430240bcd45cf6359adcab
Author: Alex Amato <aj...@google.com>
AuthorDate: Sat Jan 23 06:36:20 2021 +0000

    Add capabilities for the HarnessMonitoringInfosRequest/Response and set it on the python SDK environment
---
 model/pipeline/src/main/proto/beam_runner_api.proto     | 15 +++++++++++----
 sdks/python/apache_beam/transforms/environments.py      |  1 +
 sdks/python/apache_beam/transforms/environments_test.py |  2 ++
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 6d8e8be..e06d422 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -310,10 +310,10 @@ message StandardPTransforms {
 
     // A transform that translates a given element to its human-readable
     // representation.
-    // 
+    //
     // Input: KV<nonce, element>
     // Output: KV<nonce, string>
-    // 
+    //
     // For each given element, the implementation returns the best-effort
     // human-readable representation. When possible, the implementation could
     // call a user-overridable method on the type. For example, Java could
@@ -961,13 +961,13 @@ message StandardCoders {
     //     values.
     //   - For present values the null indicator is followed by the value
     //     encoded with it's corresponding coder.
-    // 
+    //
     // Well known logical types:
     //   beam:logical_type:micros_instant:v1
     //   - Representation type: ROW<seconds: INT64, micros: INT64>
     //   - A timestamp without a timezone where seconds + micros represents the
     //     amount of time since the epoch.
-    // 
+    //
     // The payload for RowCoder is an instance of Schema.
     // Components: None
     // Experimental.
@@ -1495,6 +1495,13 @@ message StandardProtocols {
     // simply indicates this SDK can actually parallelize the work across multiple
     // cores.
     MULTI_CORE_BUNDLE_PROCESSING = 3 [(beam_urn) = "beam:protocol:multi_core_bundle_processing:v1"];
+
+    // Indicates that this SDK handles the InstructionRequest of type
+    // HarnessMonitoringInfosRequest.
+    // A request to provide full MonitoringInfo data associated with
+    // the entire SDK harness process, not specific to a bundle.
+    HARNESS_MONITORING_INFOS = 4
+        [(beam_urn) = "beam:protocol:harness_monitoring_infos:v1"];
   }
 }
 
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index 46ea197..9159175 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -716,6 +716,7 @@ def _python_sdk_capabilities_iter():
     if getattr(urn_spec, 'urn', None) in coders.Coder._known_urns:
       yield urn_spec.urn
   yield common_urns.protocols.LEGACY_PROGRESS_REPORTING.urn
+  yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
   yield common_urns.protocols.WORKER_STATUS.urn
   yield python_urns.PACKED_COMBINE_FN
   yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py
index 155fe3c..1e50bdf 100644
--- a/sdks/python/apache_beam/transforms/environments_test.py
+++ b/sdks/python/apache_beam/transforms/environments_test.py
@@ -72,6 +72,8 @@ class RunnerApiTest(unittest.TestCase):
   def test_sdk_capabilities(self):
     sdk_capabilities = environments.python_sdk_capabilities()
     self.assertIn(common_urns.coders.LENGTH_PREFIX.urn, sdk_capabilities)
+    self.assertIn(
+        common_urns.protocols.HARNESS_MONITORING_INFOS.urn, sdk_capabilities)
     self.assertIn(common_urns.protocols.WORKER_STATUS.urn, sdk_capabilities)
     self.assertIn(
         common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,