You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/01/26 03:40:04 UTC
[beam] branch master updated: Add capabilities for the
HarnessMonitoringInfosRequest/Response and set it on the python SDK
environment
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3ddf941 Add capabilities for the HarnessMonitoringInfosRequest/Response and set it on the python SDK environment
new 5ef873f Merge pull request #13801 from Add capabilities for the HarnessMonitoringInfosRequest/Response
3ddf941 is described below
commit 3ddf941d8de9c6809e430240bcd45cf6359adcab
Author: Alex Amato <aj...@google.com>
AuthorDate: Sat Jan 23 06:36:20 2021 +0000
Add capabilities for the HarnessMonitoringInfosRequest/Response and set it on the python SDK environment
---
model/pipeline/src/main/proto/beam_runner_api.proto | 15 +++++++++++----
sdks/python/apache_beam/transforms/environments.py | 1 +
sdks/python/apache_beam/transforms/environments_test.py | 2 ++
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 6d8e8be..e06d422 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -310,10 +310,10 @@ message StandardPTransforms {
// A transform that translates a given element to its human-readable
// representation.
- //
+ //
// Input: KV<nonce, element>
// Output: KV<nonce, string>
- //
+ //
// For each given element, the implementation returns the best-effort
// human-readable representation. When possible, the implementation could
// call a user-overridable method on the type. For example, Java could
@@ -961,13 +961,13 @@ message StandardCoders {
// values.
// - For present values the null indicator is followed by the value
// encoded with it's corresponding coder.
- //
+ //
// Well known logical types:
// beam:logical_type:micros_instant:v1
// - Representation type: ROW<seconds: INT64, micros: INT64>
// - A timestamp without a timezone where seconds + micros represents the
// amount of time since the epoch.
- //
+ //
// The payload for RowCoder is an instance of Schema.
// Components: None
// Experimental.
@@ -1495,6 +1495,13 @@ message StandardProtocols {
// simply indicates this SDK can actually parallelize the work across multiple
// cores.
MULTI_CORE_BUNDLE_PROCESSING = 3 [(beam_urn) = "beam:protocol:multi_core_bundle_processing:v1"];
+
+ // Indicates that this SDK handles the InstructionRequest of type
+ // HarnessMonitoringInfosRequest.
+ // A request to provide full MonitoringInfo data associated with
+ // the entire SDK harness process, not specific to a bundle.
+ HARNESS_MONITORING_INFOS = 4
+ [(beam_urn) = "beam:protocol:harness_monitoring_infos:v1"];
}
}
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index 46ea197..9159175 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -716,6 +716,7 @@ def _python_sdk_capabilities_iter():
if getattr(urn_spec, 'urn', None) in coders.Coder._known_urns:
yield urn_spec.urn
yield common_urns.protocols.LEGACY_PROGRESS_REPORTING.urn
+ yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn
yield common_urns.protocols.WORKER_STATUS.urn
yield python_urns.PACKED_COMBINE_FN
yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image()
diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py
index 155fe3c..1e50bdf 100644
--- a/sdks/python/apache_beam/transforms/environments_test.py
+++ b/sdks/python/apache_beam/transforms/environments_test.py
@@ -72,6 +72,8 @@ class RunnerApiTest(unittest.TestCase):
def test_sdk_capabilities(self):
sdk_capabilities = environments.python_sdk_capabilities()
self.assertIn(common_urns.coders.LENGTH_PREFIX.urn, sdk_capabilities)
+ self.assertIn(
+ common_urns.protocols.HARNESS_MONITORING_INFOS.urn, sdk_capabilities)
self.assertIn(common_urns.protocols.WORKER_STATUS.urn, sdk_capabilities)
self.assertIn(
common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,